Spaces:
Sleeping
Sleeping
| import argparse | |
| import ast | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| class ToolCall: | |
| name: str | |
| args: Dict[str, Any] | |
| class ToolResult: | |
| ok: bool | |
| output: str | |
| artifacts: List[str] | |
| def _tokenize(s: str) -> List[str]: | |
| s = s.lower() | |
| s = re.sub(r"[^0-9a-z\u4e00-\u9fff]+", " ", s) | |
| parts = [p.strip() for p in s.split() if p.strip()] | |
| return parts | |
| def tool_search_kb(query: str, kb_dir: str, top_k: int = 4) -> ToolResult: | |
| base = Path(kb_dir) | |
| if not base.exists(): | |
| return ToolResult(ok=True, output="[]", artifacts=[]) | |
| q_tokens = _tokenize(query) | |
| if not q_tokens: | |
| return ToolResult(ok=True, output="[]", artifacts=[]) | |
| hits: List[Tuple[int, str, str]] = [] | |
| for p in sorted(base.glob("**/*.md")): | |
| try: | |
| text = p.read_text(encoding="utf-8") | |
| except Exception: | |
| continue | |
| chunks = [c.strip() for c in re.split(r"\n\s*\n+", text) if c.strip()] | |
| for c in chunks: | |
| c_tokens = _tokenize(c) | |
| score = sum(1 for t in q_tokens if t in c_tokens) | |
| if score <= 0: | |
| continue | |
| hits.append((score, str(p), c)) | |
| hits.sort(key=lambda x: (-x[0], x[1])) | |
| picked = hits[: max(0, int(top_k))] | |
| out = [{"score": s, "source": src, "text": txt} for (s, src, txt) in picked] | |
| return ToolResult(ok=True, output=json.dumps(out, ensure_ascii=False, indent=2), artifacts=[]) | |
| class _SafeEval(ast.NodeVisitor): | |
| def visit(self, node): | |
| return super().visit(node) | |
| def generic_visit(self, node): | |
| raise ValueError(f"unsupported: {node.__class__.__name__}") | |
| def visit_Expression(self, node: ast.Expression): | |
| return self.visit(node.body) | |
| def visit_BinOp(self, node: ast.BinOp): | |
| left = self.visit(node.left) | |
| right = self.visit(node.right) | |
| if isinstance(node.op, ast.Add): | |
| return left + right | |
| if isinstance(node.op, ast.Sub): | |
| return left - right | |
| if isinstance(node.op, ast.Mult): | |
| return left * right | |
| if isinstance(node.op, ast.Div): | |
| return left / right | |
| if isinstance(node.op, ast.FloorDiv): | |
| return left // right | |
| if isinstance(node.op, ast.Mod): | |
| return left % right | |
| if isinstance(node.op, ast.Pow): | |
| return left ** right | |
| raise ValueError("unsupported operator") | |
| def visit_UnaryOp(self, node: ast.UnaryOp): | |
| v = self.visit(node.operand) | |
| if isinstance(node.op, ast.UAdd): | |
| return +v | |
| if isinstance(node.op, ast.USub): | |
| return -v | |
| raise ValueError("unsupported unary operator") | |
| def visit_Constant(self, node: ast.Constant): | |
| if isinstance(node.value, (int, float)): | |
| return node.value | |
| raise ValueError("only numbers allowed") | |
| def tool_calc(expression: str) -> ToolResult: | |
| expr = expression.strip() | |
| if not re.fullmatch(r"[0-9\.\s\+\-\*\/\(\)\%\^]+", expr): | |
| return ToolResult(ok=False, output="表达式包含不允许的字符", artifacts=[]) | |
| expr = expr.replace("^", "**") | |
| try: | |
| tree = ast.parse(expr, mode="eval") | |
| value = _SafeEval().visit(tree) | |
| except Exception as e: | |
| return ToolResult(ok=False, output=f"计算失败: {e}", artifacts=[]) | |
| return ToolResult(ok=True, output=str(value), artifacts=[]) | |
| def tool_write_text(path: str, text: str) -> ToolResult: | |
| p = Path(path) | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(text, encoding="utf-8") | |
| return ToolResult(ok=True, output=f"wrote: {p}", artifacts=[str(p)]) | |
| def tool_read_text(path: str) -> ToolResult: | |
| p = Path(path) | |
| if not p.exists(): | |
| return ToolResult(ok=False, output="not found", artifacts=[]) | |
| return ToolResult(ok=True, output=p.read_text(encoding="utf-8"), artifacts=[]) | |
| def _extract_math_expr(goal: str) -> Optional[str]: | |
| m = re.search(r"(\d[\d\s\+\-\*\/\(\)\%\^\.]*\d)", goal) | |
| if not m: | |
| return None | |
| expr = m.group(1) | |
| expr = re.sub(r"\s+", "", expr) | |
| if len(expr) > 120: | |
| return None | |
| if not re.search(r"[\+\-\*\/\%\^]", expr): | |
| return None | |
| return expr | |
| def _extract_faq_count(goal: str) -> Optional[int]: | |
| m = re.search(r"(\d+)\s*条\s*FAQ", goal, flags=re.IGNORECASE) | |
| if m: | |
| return max(1, min(20, int(m.group(1)))) | |
| m = re.search(r"(\d+)\s*条", goal) | |
| if m and ("faq" in goal.lower() or "常见问题" in goal): | |
| return max(1, min(20, int(m.group(1)))) | |
| if "faq" in goal.lower() or "常见问题" in goal: | |
| return 5 | |
| return None | |
| def _norm_space(s: str) -> str: | |
| return re.sub(r"\s+", " ", (s or "").strip()) | |
| def _lead_to_goal(lead: Dict[str, Any]) -> str: | |
| company = _norm_space(str(lead.get("company") or "")) | |
| contact = _norm_space(str(lead.get("contact") or "")) | |
| role = _norm_space(str(lead.get("role") or "")) | |
| channel = _norm_space(str(lead.get("channel") or "")) | |
| product = _norm_space(str(lead.get("product") or "")) | |
| stage = _norm_space(str(lead.get("stage") or "")) | |
| pain = _norm_space(str(lead.get("pain_points") or "")) | |
| budget = _norm_space(str(lead.get("budget") or "")) | |
| timeline = _norm_space(str(lead.get("timeline") or "")) | |
| notes = _norm_space(str(lead.get("notes") or "")) | |
| parts: List[str] = [] | |
| parts.append("生成一份销售线索跟进方案(含补问清单、下一步任务清单、跟进话术:微信/电话/邮件、风险提示),并写进报告。") | |
| if company: | |
| parts.append(f"公司/组织:{company}") | |
| if contact or role: | |
| who = contact if contact else "(未提供姓名)" | |
| if role: | |
| who = f"{who}({role})" | |
| parts.append(f"联系人:{who}") | |
| if channel: | |
| parts.append(f"来源渠道:{channel}") | |
| if product: | |
| parts.append(f"关注产品:{product}") | |
| if stage: | |
| parts.append(f"当前阶段:{stage}") | |
| if pain: | |
| parts.append(f"主要诉求/痛点:{pain}") | |
| if budget: | |
| parts.append(f"预算:{budget}") | |
| if timeline: | |
| parts.append(f"期望时间:{timeline}") | |
| if notes: | |
| parts.append(f"备注:{notes}") | |
| return "\n".join(parts) | |
| def make_plan(goal: str, kb_dir: str, out_report: str) -> List[ToolCall]: | |
| plan: List[ToolCall] = [] | |
| plan.append(ToolCall(name="search_kb", args={"query": goal, "kb_dir": kb_dir, "top_k": 6})) | |
| expr = _extract_math_expr(goal) | |
| if expr: | |
| plan.append(ToolCall(name="calc", args={"expression": expr})) | |
| plan.append(ToolCall(name="write_report", args={"goal": goal, "out_report": out_report})) | |
| return plan | |
| def _format_faq_items(snippets: List[Dict[str, Any]], n: int) -> List[str]: | |
| pool: List[str] = [] | |
| for s in snippets: | |
| t = str(s.get("text", "")).strip() | |
| t = re.sub(r"\s+", " ", t) | |
| if len(t) >= 12: | |
| pool.append(t) | |
| if not pool: | |
| pool = [ | |
| "我们提供从获客到转化的自动化流程编排能力,减少重复操作。", | |
| "支持接入常见广告与社媒渠道,并统一沉淀线索与素材。", | |
| "关键数据可追踪,可用规则触发通知与任务分配。", | |
| "提供基础的权限与审计,方便多人协作与风控。", | |
| "支持将常见问题与业务规范整理进知识库,供智能体检索。", | |
| ] | |
| items: List[str] = [] | |
| i = 0 | |
| while len(items) < n: | |
| items.append(pool[i % len(pool)]) | |
| i += 1 | |
| return items | |
| def _render_report(goal: str, snippets: List[Dict[str, Any]], faq_count: Optional[int], math_expr: Optional[str], math_result: Optional[str]) -> str: | |
| lines: List[str] = [] | |
| lines.append("# 智能体生成报告") | |
| lines.append("") | |
| lines.append("## 目标") | |
| lines.append(goal.strip()) | |
| lines.append("") | |
| if snippets: | |
| lines.append("## 知识库检索摘要") | |
| for i, s in enumerate(snippets[:4], start=1): | |
| src = os.path.basename(str(s.get("source", ""))) | |
| txt = str(s.get("text", "")).strip().replace("\n", " ") | |
| txt = re.sub(r"\s+", " ", txt) | |
| lines.append(f"- 片段{i}({src}):{txt}") | |
| lines.append("") | |
| if faq_count: | |
| lines.append(f"## FAQ({faq_count} 条)") | |
| items = _format_faq_items(snippets, faq_count) | |
| for it in items: | |
| lines.append(f"- {it}") | |
| lines.append("") | |
| if math_expr: | |
| lines.append("## 计算结果") | |
| if math_result is None: | |
| lines.append(f"- 表达式:{math_expr}") | |
| lines.append("- 结果:计算失败") | |
| else: | |
| lines.append(f"- 表达式:{math_expr}") | |
| lines.append(f"- 结果:{math_result}") | |
| lines.append("") | |
| lines.append("## 结论") | |
| lines.append("已基于本地知识库完成检索,并生成可复核的报告工件。") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _lead_sections(goal: str, snippets: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| lines = goal.splitlines() | |
| facts: Dict[str, str] = {} | |
| for ln in lines: | |
| if ":" in ln: | |
| k, v = ln.split(":", 1) | |
| k = _norm_space(k) | |
| v = _norm_space(v) | |
| if k and v: | |
| facts[k] = v | |
| company = facts.get("公司/组织", "") | |
| product = facts.get("关注产品", "") | |
| stage = facts.get("当前阶段", "") | |
| pain = facts.get("主要诉求/痛点", "") | |
| budget = facts.get("预算", "") | |
| timeline = facts.get("期望时间", "") | |
| summary = ";".join([p for p in [company, product, stage, pain] if p]) or "已记录线索信息。" | |
| questions: List[str] = [ | |
| "当前目标是什么(增长/获客/转化/复购/效率/风控)?优先级如何排序?", | |
| "现有流程里最耗时或最容易丢信息的环节是哪一步?", | |
| "线索来源结构与规模(渠道、日均量、转化率、客单价)大概是多少?", | |
| "团队协作方式(谁负责、如何交接、是否需要权限与审计)?", | |
| "期望在多长周期内看到可量化改进?评估指标是什么?", | |
| ] | |
| if budget: | |
| questions.append("预算口径(一次性/按月/按量)与采购流程(招采/合同/付款节点)?") | |
| if timeline: | |
| questions.append("上线时间点是否有外部约束(活动/发布/投放节奏)?") | |
| tasks: List[Dict[str, str]] = [ | |
| {"task": "补齐关键字段:目标、现状流程、线索规模、关键指标", "owner": "你", "due": "今天"}, | |
| {"task": "确认决策链路:需求方/使用方/审批方/预算方", "owner": "你", "due": "今天"}, | |
| {"task": "整理 3 个典型线索样本与跟进记录(脱敏)", "owner": "客户", "due": "本周"}, | |
| {"task": "对齐一版 SOP:触达→跟进→转化→复盘的关键动作与责任人", "owner": "你", "due": "本周"}, | |
| {"task": "输出落地方案:自动化触发点、权限与审计、指标口径", "owner": "你", "due": "本周"}, | |
| {"task": "安排演示/试用:用样本数据跑一遍闭环", "owner": "你", "due": "下周"}, | |
| ] | |
| wechat = "我先基于你们当前的线索流程整理一版“可落地的跟进方案+自动化触发点”,你看方便补充下:目标/线索规模/当前痛点三项吗?补齐后我今天给你一版可执行清单。" | |
| phone = "你好,我是这边负责增长流程自动化方案的。想快速了解下:你们目前线索从哪个渠道进来、谁负责跟进、最容易卡在哪一步?我这边把方案做成你们团队可直接执行的清单。" | |
| email = "\n".join( | |
| [ | |
| "主题:线索跟进闭环方案梳理(补问清单 + 可执行任务)", | |
| "", | |
| "你好,", | |
| "", | |
| "我已根据当前线索信息先拟了一版跟进方案(含补问清单、下一步任务、话术与风险点)。", | |
| "为保证方案能直接落地,麻烦补充 3 点:目标/线索规模/当前流程卡点(可简单一句话)。", | |
| "", | |
| "我收到后会在当天给出:", | |
| "1) 可执行的任务清单(负责人/截止时间)", | |
| "2) 自动化触发点建议", | |
| "3) 指标口径与复盘方式", | |
| "", | |
| "谢谢", | |
| ] | |
| ) | |
| risks = [ | |
| "对外触达需避免夸大承诺,话术以“可量化目标/可验证步骤”为准。", | |
| "涉及个人信息与线索数据时,需明确数据来源与留存周期,按权限最小化原则访问。", | |
| "对接渠道 API/表单时,先做字段映射与去重规则,避免重复触达与口径混乱。", | |
| ] | |
| cited: List[str] = [] | |
| for s in snippets[:3]: | |
| src = os.path.basename(str(s.get("source", ""))) | |
| txt = _norm_space(str(s.get("text", "")).replace("\n", " ")) | |
| if txt: | |
| cited.append(f"{src}:{txt}") | |
| return { | |
| "summary": summary, | |
| "questions": questions[:7], | |
| "tasks": tasks, | |
| "scripts": {"wechat": wechat, "phone": phone, "email": email}, | |
| "risks": risks, | |
| "cited": cited, | |
| } | |
| def _render_lead_report(goal: str, snippets: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]: | |
| data = _lead_sections(goal=goal, snippets=snippets) | |
| lines: List[str] = [] | |
| lines.append("# 线索跟进方案") | |
| lines.append("") | |
| lines.append("## 线索摘要") | |
| lines.append(data["summary"]) | |
| lines.append("") | |
| lines.append("## 需要补问(建议按顺序)") | |
| for q in data["questions"]: | |
| lines.append(f"- {q}") | |
| lines.append("") | |
| lines.append("## 下一步任务清单") | |
| for t in data["tasks"]: | |
| lines.append(f"- {t['task']}(负责人:{t['owner']};截止:{t['due']})") | |
| lines.append("") | |
| lines.append("## 跟进话术") | |
| lines.append("### 微信/IM") | |
| lines.append(data["scripts"]["wechat"]) | |
| lines.append("") | |
| lines.append("### 电话开场") | |
| lines.append(data["scripts"]["phone"]) | |
| lines.append("") | |
| lines.append("### 邮件") | |
| lines.append(data["scripts"]["email"]) | |
| lines.append("") | |
| lines.append("## 风险提示") | |
| for r in data["risks"]: | |
| lines.append(f"- {r}") | |
| lines.append("") | |
| if data["cited"]: | |
| lines.append("## 参考知识片段") | |
| for c in data["cited"]: | |
| lines.append(f"- {c}") | |
| lines.append("") | |
| lines.append("## 原始目标") | |
| lines.append(goal.strip()) | |
| lines.append("") | |
| return "\n".join(lines), data | |
| def _needs_report(goal: str) -> bool: | |
| g = goal.lower() | |
| if "报告" in goal or "写进报告" in goal or "写入报告" in goal: | |
| return True | |
| if "report" in g: | |
| return True | |
| return True | |
| def check_closure(goal: str, out_report: str, mode: str) -> Tuple[bool, str]: | |
| p = Path(out_report) | |
| if _needs_report(goal): | |
| if not p.exists(): | |
| return False, "报告文件不存在" | |
| txt = p.read_text(encoding="utf-8") | |
| if mode == "lead_followup": | |
| if "线索跟进方案" not in txt: | |
| return False, "报告缺少标题" | |
| if "需要补问" not in txt or "下一步任务清单" not in txt or "跟进话术" not in txt: | |
| return False, "报告结构不完整" | |
| if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < 8: | |
| return False, "清单项不足" | |
| else: | |
| if "智能体生成报告" not in txt: | |
| return False, "报告缺少标题" | |
| faq_count = _extract_faq_count(goal) | |
| if faq_count: | |
| if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < faq_count: | |
| return False, "FAQ 条数不足" | |
| expr = _extract_math_expr(goal) | |
| if expr: | |
| if expr not in txt: | |
| return False, "报告未包含表达式" | |
| calc_res = tool_calc(expr) | |
| if calc_res.ok and str(calc_res.output) not in txt: | |
| return False, "报告未包含计算结果" | |
| return True, "ok" | |
| def run_agent(goal: str, kb_dir: str, out_report: str, max_rounds: int = 3, mode: str = "general") -> Dict[str, Any]: | |
| round_logs: List[Dict[str, Any]] = [] | |
| state: Dict[str, Any] = {"snippets": [], "math_expr": None, "math_result": None, "artifacts": [], "structured": None} | |
| for r in range(1, max_rounds + 1): | |
| plan = make_plan(goal=goal, kb_dir=kb_dir, out_report=out_report) | |
| step_logs: List[Dict[str, Any]] = [] | |
| for step in plan: | |
| if step.name == "search_kb": | |
| res = tool_search_kb(**step.args) | |
| if res.ok: | |
| try: | |
| state["snippets"] = json.loads(res.output) | |
| except Exception: | |
| state["snippets"] = [] | |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output[:800]}) | |
| continue | |
| if step.name == "calc": | |
| state["math_expr"] = step.args.get("expression") | |
| res = tool_calc(**step.args) | |
| state["math_result"] = res.output if res.ok else None | |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) | |
| continue | |
| if step.name == "write_report": | |
| faq_count = _extract_faq_count(goal) | |
| if mode == "lead_followup": | |
| report, structured = _render_lead_report(goal=goal, snippets=state.get("snippets") or []) | |
| state["structured"] = structured | |
| else: | |
| report = _render_report( | |
| goal=goal, | |
| snippets=state.get("snippets") or [], | |
| faq_count=faq_count, | |
| math_expr=state.get("math_expr"), | |
| math_result=state.get("math_result"), | |
| ) | |
| res = tool_write_text(path=out_report, text=report) | |
| state["artifacts"].extend(res.artifacts) | |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) | |
| continue | |
| step_logs.append({"tool": step.name, "ok": False, "output": "unknown tool"}) | |
| ok, reason = check_closure(goal=goal, out_report=out_report, mode=mode) | |
| round_logs.append({"round": r, "plan": [c.__dict__ for c in plan], "steps": step_logs, "check": {"ok": ok, "reason": reason}}) | |
| if ok: | |
| break | |
| final = [] | |
| final.append("已完成闭环执行:规划→工具执行→自检→产出。") | |
| final.append(f"报告工件:{out_report}") | |
| expr = state.get("math_expr") | |
| if expr: | |
| final.append(f"计算:{expr} = {state.get('math_result')}") | |
| faq_count = _extract_faq_count(goal) | |
| if faq_count: | |
| final.append(f"FAQ:{faq_count} 条(见报告)") | |
| return { | |
| "goal": goal, | |
| "out_report": out_report, | |
| "round_logs": round_logs, | |
| "final_answer": "\n".join(final), | |
| "artifacts": state.get("artifacts"), | |
| "structured": state.get("structured"), | |
| "mode": mode, | |
| } | |
| def run_agent_mode(mode: str, payload: Dict[str, Any], kb_dir: str, out_report: str, max_rounds: int = 3) -> Dict[str, Any]: | |
| mode = (mode or "general").strip() | |
| if mode == "lead_followup": | |
| lead = payload.get("lead") or {} | |
| goal = _lead_to_goal(lead if isinstance(lead, dict) else {}) | |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode=mode) | |
| return result | |
| goal = str(payload.get("goal") or "").strip() | |
| return run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode="general") | |
| def _print_round_logs(round_logs: List[Dict[str, Any]]) -> None: | |
| for r in round_logs: | |
| print(f"\n=== Round {r['round']} ===") | |
| print("Plan:") | |
| for s in r["plan"]: | |
| print(f"- {s['name']} {json.dumps(s['args'], ensure_ascii=False)}") | |
| print("Steps:") | |
| for s in r["steps"]: | |
| out = s["output"] | |
| out = out.replace("\n", " ") | |
| if len(out) > 180: | |
| out = out[:180] + "..." | |
| print(f"- {s['tool']}: ok={s['ok']} output={out}") | |
| print(f"Check: ok={r['check']['ok']} reason={r['check']['reason']}") | |
| def cmd_demo() -> int: | |
| here = Path(__file__).resolve().parent | |
| goal = "基于知识库,生成一份 5 条 FAQ,并计算 17*23 的结果写进报告" | |
| out_report = str(here / "out" / "demo_report.md") | |
| kb_dir = str(here / "kb") | |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) | |
| _print_round_logs(result["round_logs"]) | |
| print("\n=== Final ===") | |
| print(result["final_answer"]) | |
| try: | |
| txt = (here / "out" / "demo_report.md").read_text(encoding="utf-8") | |
| head = "\n".join(txt.splitlines()[:22]) | |
| print("\n=== Report Preview (Top) ===") | |
| print(head) | |
| except Exception: | |
| pass | |
| return 0 | |
| def cmd_run(goal: str) -> int: | |
| here = Path(__file__).resolve().parent | |
| out_report = str(here / "out" / "demo_report.md") | |
| kb_dir = str(here / "kb") | |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) | |
| _print_round_logs(result["round_logs"]) | |
| print("\n=== Final ===") | |
| print(result["final_answer"]) | |
| return 0 | |
| def main(argv: List[str]) -> int: | |
| p = argparse.ArgumentParser(prog="agent.py") | |
| sub = p.add_subparsers(dest="cmd", required=True) | |
| sub.add_parser("demo") | |
| p_run = sub.add_parser("run") | |
| p_run.add_argument("--goal", required=True) | |
| args = p.parse_args(argv) | |
| if args.cmd == "demo": | |
| return cmd_demo() | |
| if args.cmd == "run": | |
| return cmd_run(goal=args.goal) | |
| return 2 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) | |