LoongFlow / app.py
FreshmanD's picture
Update app.py
e5124ae verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LoongFlow HuggingFace Spaces Demo
展示 PEES (Plan-Execute-Execute-Summary) 进化式 Agent 工作流程
"""
import gradio as gr
import pandas as pd
import time
import random
from typing import List, Dict, Any, Tuple
# ============================================================================
# PEES 工作流程模拟
# ============================================================================
def simulate_planner(task: str) -> Dict[str, Any]:
"""模拟 Planner 阶段 - 制定战略计划"""
time.sleep(0.3)
strategies = [
"我将采用分治策略,把任务分解为多个子问题分别解决。",
"首先进行需求分析,然后设计系统架构,最后逐步实现。",
"使用迭代式开发,从最小可行产品开始,逐步添加功能。",
"采用自顶向下的方法,先定义接口,再实现具体逻辑。",
]
return {
"role": "Planner",
"thought": random.choice(strategies),
"plan": f"""
## 任务分析
- 用户需求: {task}
## 战略规划
1. 理解任务本质和目标
2. 设计整体架构方案
3. 制定分步实施计划
4. 预留扩展和优化空间
""".strip(),
"timestamp": time.strftime("%H:%M:%S")
}
def simulate_executor(task: str, plan: str) -> Dict[str, Any]:
"""模拟第一个 Execute 阶段 - 实现代码"""
time.sleep(0.5)
code_samples = {
"todo": '''```python
# Todo List App - 实现
class TodoList:
def __init__(self):
self.tasks = []
def add_task(self, title, priority="medium"):
task = {
"id": len(self.tasks) + 1,
"title": title,
"priority": priority,
"done": False,
"created_at": datetime.now()
}
self.tasks.append(task)
return task
def complete_task(self, task_id):
for task in self.tasks:
if task["id"] == task_id:
task["done"] = True
return True
return False
def get_pending(self):
return [t for t in self.tasks if not t["done"]]
```''',
"file": '''```python
# File Processor - 实现
import os
import shutil
from pathlib import Path
class FileProcessor:
def __init__(self, input_dir, output_dir):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
def process_all(self):
results = []
for filepath in self.input_dir.rglob("*"):
if filepath.is_file():
dest = self.output_dir / filepath.relative_to(self.input_dir)
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(filepath, dest)
results.append({"file": str(filepath), "status": "copied"})
return results
```''',
"default": '''```python
# Solution Implementation - 实现
class Solution:
def __init__(self, task):
self.task = task
self.components = {}
def analyze(self):
"""分析任务需求"""
return {"requirements": "...", "constraints": "..."}
def design(self):
"""设计解决方案"""
return {"architecture": "...", "flow": "..."}
def implement(self):
"""实现代码"""
return {"code": "...", "tests": "..."}
def run(self):
return self.implement()
```'''
}
code = code_samples.get("default")
for key, c in code_samples.items():
if key in task.lower():
code = c
break
return {
"role": "Executor",
"action": "编写并执行实现代码",
"code": code,
"result": "代码实现完成",
"timestamp": time.strftime("%H:%M:%S")
}
def simulate_executor2(task: str, previous_result: str) -> Dict[str, Any]:
"""模拟第二个 Execute 阶段 - 验证测试"""
time.sleep(0.4)
test_samples = {
"todo": '''```python
# 测试用例
def test_todo_list():
todo = TodoList()
# 测试添加任务
task = todo.add_task("完成报告", "high")
assert task["title"] == "完成报告"
assert task["priority"] == "high"
# 测试完成任务
todo.complete_task(task["id"])
assert task["done"] == True
# 测试获取待办
pending = todo.get_pending()
assert len(pending) == 0
print("所有测试通过!")
```''',
"file": '''```python
# 测试用例
def test_file_processor():
processor = FileProcessor("input", "output")
# 创建测试文件
os.makedirs("input", exist_ok=True)
with open("input/test.txt", "w") as f:
f.write("test")
# 执行处理
results = processor.process_all()
# 验证结果
assert os.path.exists("output/test.txt")
assert len(results) == 1
print("所有测试通过!")
```''',
"default": '''```python
# 验证测试
def test_solution():
solution = Solution("task")
# 测试各个组件
analysis = solution.analyze()
assert analysis is not None
design = solution.design()
assert design is not None
result = solution.run()
assert result is not None
print("所有测试通过!")
```'''
}
test_code = test_samples.get("default")
for key, c in test_samples.items():
if key in task.lower():
test_code = c
break
return {
"role": "Executor2",
"action": "编写并运行测试用例",
"code": test_code,
"result": "测试执行完成",
"timestamp": time.strftime("%H:%M:%S")
}
def simulate_summary(iteration: int, score: float, target: float) -> Dict[str, Any]:
"""模拟 Summary 阶段的反思过程"""
time.sleep(0.3)
reflections_positive = [
"本次迭代成功实现了核心功能,分数有明显提升。",
"代码结构良好,解决方案更优雅。",
"测试覆盖完整,边界情况处理得当。",
"验证通过,性能达到预期。",
]
reflections_negative = [
"本次迭代遇到一些问题,分数略有下降。",
"实现方案有缺陷,需要重新调整。",
"某些边界情况未处理好,导致扣分。",
"测试未完全通过,需要修复。",
]
improvements_positive = [
"继续保持当前良好的实现方式",
"建议扩展更多功能",
"可以尝试更多边界情况",
]
improvements_negative = [
"需要修复实现的bug",
"建议优化代码结构",
"需要添加更多的错误处理",
"考虑性能优化",
]
# 改进分数模拟逻辑:
# 整体上升趋势,最后一次迭代要超过目标
# 计算目标与当前的差距
gap = target - score
if gap > 0.3:
# 早期:快速上升
base_gain = random.uniform(0.18, 0.28)
new_score = score + base_gain
elif gap > 0.1:
# 中期:稳步上升,带小幅波动
base_gain = gap * random.uniform(0.5, 0.7) # 每次前进一半到七成的差距
oscillation = random.uniform(-0.05, 0.05) # 小幅振荡
new_score = score + base_gain + oscillation
else:
# 后期:接近或超过目标
# 最后一定要超过目标
new_score = target + random.uniform(0.02, 0.08)
# 限制范围
new_score = max(0.15, min(1.0, new_score))
if new_score >= score:
reflection = random.choice(reflections_positive)
improvement = random.choice(improvements_positive)
else:
reflection = random.choice(reflections_negative)
improvement = random.choice(improvements_negative)
return {
"role": "Summary",
"reflection": reflection,
"improvement": improvement,
"score": new_score,
"timestamp": time.strftime("%H:%M:%S")
}
def run_pees_iteration(task: str, iteration: int, current_score: float, target: float) -> Tuple[List[Dict[str, Any]], float]:
"""运行一次完整的 PEES 迭代"""
results = []
# Phase 1: Plan
planner_result = simulate_planner(task)
results.append({
"phase": "Plan",
"phase_name": "计划",
"content": planner_result["thought"],
"detail": planner_result["plan"],
"timestamp": planner_result["timestamp"]
})
# Phase 2: Execute (实现)
executor_result = simulate_executor(task, planner_result["plan"])
results.append({
"phase": "Execute",
"phase_name": "执行",
"content": executor_result["action"],
"detail": f"{executor_result['code']}\n\n执行结果: {executor_result['result']}",
"timestamp": executor_result["timestamp"]
})
# Phase 3: Evaluate (验证)
executor2_result = simulate_executor2(task, executor_result["result"])
results.append({
"phase": "Evaluate",
"phase_name": "验证",
"content": executor2_result["action"],
"detail": f"{executor2_result['code']}\n\n验证结果: {executor2_result['result']}",
"timestamp": executor2_result["timestamp"]
})
# Phase 4: Summary
summary_result = simulate_summary(iteration, current_score, target)
results.append({
"phase": "Summary",
"phase_name": "总结",
"content": summary_result["reflection"],
"detail": f"改进建议: {summary_result['improvement']}\n\n当前分数: {summary_result['score']:.2f}",
"timestamp": summary_result["timestamp"]
})
return results, summary_result["score"]
# ============================================================================
# Gradio UI
# ============================================================================
def create_demo():
"""创建 Gradio 界面"""
with gr.Blocks(title="LoongFlow PEES Demo", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# LoongFlow PEES Agent Demo
**LoongFlow** 是一个进化式 Agent 开发框架,采用 **PEES (Plan-Execute-Evaluate-Summary)** 思考范式。
---
### PEES 工作流程
```
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Plan │ → │ Execute │ → │ Evaluate │ → │ Summary │
│ 计划 │ │ 执行 │ │ 验证 │ │ 总结 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │
│ ◀──── 迭代改进 ────│
┌─────────┐
│ 目标达成 │
└─────────┘
```
- **Plan (P)**: 分析任务,制定战略计划
- **Execute (E1)**: 编写代码,实现功能
- **Evaluate (E2)**: 编写测试,验证功能
- **Summary (S)**: 反思结果,提取改进建议
""")
with gr.Row():
with gr.Column(scale=2):
task_input = gr.Textbox(
label="输入任务描述",
placeholder="例如: 帮我写一个待办事项应用 / 创建一个文件处理工具",
lines=3
)
with gr.Row():
max_iterations = gr.Slider(
minimum=1, maximum=10, value=5, step=1,
label="最大迭代次数"
)
target_score = gr.Slider(
minimum=0.5, maximum=1.0, value=0.85, step=0.05,
label="目标分数"
)
run_btn = gr.Button("开始执行任务", variant="primary")
with gr.Column(scale=1):
status_output = gr.Textbox(
label="执行状态",
lines=5,
interactive=False
)
# 分数演进 - 用 HTML 进度条
score_display = gr.HTML(label="分数演进")
# 分数历史
score_list = gr.JSON(label="分数历史", visible=False)
gr.Markdown("### 迭代详情")
# 使用 Tab 展示四个阶段
with gr.Tabs():
with gr.Tab("Plan 计划"):
plan_output = gr.Markdown("*等待开始...*")
with gr.Tab("Execute 执行"):
execute1_output = gr.Markdown("*等待开始...*")
with gr.Tab("Evaluate 验证"):
execute2_output = gr.Markdown("*等待开始...*")
with gr.Tab("Summary 总结"):
summary_output = gr.Markdown("*等待开始...*")
def run_task(task: str, max_iter: int, target: float):
if not task or not task.strip():
yield "错误: 请输入任务描述", "", "", "", "", ""
return
chart_data = []
current_score = 0.0
empty_md = "*等待开始...*"
empty_svg = '<svg width="400" height="250"><text x="200" y="130" text-anchor="middle" fill="#999">等待开始...</text></svg>'
# 初始状态
yield "状态: 准备执行任务...", empty_svg, empty_md, empty_md, empty_md, empty_md
for i in range(1, int(max_iter) + 1):
# 执行完整迭代
results, current_score = run_pees_iteration(task, i, current_score, target)
# 分别获取四个阶段的结果
plan_result = results[0]
execute1_result = results[1]
execute2_result = results[2]
summary_result = results[3]
# 格式化每个阶段的输出
plan_md = f"""### 迭代 {i} - Plan 计划
**时间**: {plan_result['timestamp']}
{plan_result['content']}
<details>
<summary>查看计划详情</summary>
{plan_result['detail']}
</details>
"""
exec1_md = f"""### 迭代 {i} - Execute 执行
**时间**: {execute1_result['timestamp']}
{execute1_result['content']}
<details>
<summary>查看实现代码</summary>
{execute1_result['detail']}
</details>
"""
exec2_md = f"""### 迭代 {i} - Evaluate 验证
**时间**: {execute2_result['timestamp']}
{execute2_result['content']}
<details>
<summary>查看测试代码</summary>
{execute2_result['detail']}
</details>
"""
summary_md = f"""### 迭代 {i} - Summary 总结
**时间**: {summary_result['timestamp']}
{summary_result['content']}
<details>
<summary>查看改进建议</summary>
{summary_result['detail']}
</details>
"""
# 更新数据
chart_data.append({"iteration": i, "score": round(current_score, 2)})
# 生成 HTML 折线图 - SVG 实现
if len(chart_data) == 1:
# 只有一个点,画一个点
svg = f'''
<svg width="400" height="250" style="border:1px solid #ccc; background:white;">
<text x="200" y="130" text-anchor="middle" fill="#666">分数: {chart_data[0]["score"]:.2f}</text>
<circle cx="50" cy="{200 - chart_data[0]["score"]*180}" r="8" fill="#22c55e"/>
</svg>
'''
else:
# 多个点,画折线
width = 400
height = 250
padding = 40
plot_width = width - padding * 2
plot_height = height - padding * 2
# 生成点和线的 SVG
points_svg = ""
lines_svg = ""
for idx, item in enumerate(chart_data):
x = padding + idx * (plot_width / (len(chart_data) - 1))
y = padding + plot_height - item["score"] * plot_height
points_svg += f'<circle cx="{x}" cy="{y}" r="6" fill="#22c55e" stroke="white" stroke-width="2"/>'
points_svg += f'<text x="{x}" y="{y-15}" text-anchor="middle" font-size="12" fill="#333">{item["score"]:.2f}</text>'
if idx > 0:
prev_x = padding + (idx - 1) * (plot_width / (len(chart_data) - 1))
prev_y = padding + plot_height - chart_data[idx-1]["score"] * plot_height
lines_svg += f'<line x1="{prev_x}" y1="{prev_y}" x2="{x}" y2="{y}" stroke="#22c55e" stroke-width="3"/>'
# 添加坐标轴
svg = f'''
<svg width="{width}" height="{height}" style="border:1px solid #ccc; background:white; border-radius:8px;">
<!-- Y轴标签 -->
<text x="15" y="50" font-size="12" fill="#666">1.0</text>
<text x="15" y="{padding + plot_height/2}" font-size="12" fill="#666">0.5</text>
<text x="15" y="{height-20}" font-size="12" fill="#666">0.0</text>
<!-- X轴标签 -->
<text x="{width/2}" y="{height-5}" font-size="12" fill="#666" text-anchor="middle">迭代次数</text>
<!-- 折线 -->
{lines_svg}
{points_svg}
</svg>
'''
# 每次迭代完成后更新 UI
status = f"状态: 第 {i}/{int(max_iter)} 次迭代完成 (分数: {current_score:.2f})"
yield status, svg, plan_md, exec1_md, exec2_md, summary_md
# 检查是否达到目标
if current_score >= target:
break
time.sleep(0.3)
final_status = f"状态: 任务完成\n最终分数: {current_score:.2f}\n总迭代次数: {len(chart_data)}"
yield final_status, svg, plan_md, exec1_md, exec2_md, summary_md
run_btn.click(
fn=run_task,
inputs=[task_input, max_iterations, target_score],
outputs=[status_output, score_display, plan_output, execute1_output, execute2_output, summary_output]
)
gr.Markdown("""
---
### 关于 LoongFlow
LoongFlow 是一个面向复杂任务的进化式 Agent 框架,特别适用于:
- **数学推理**: 开放式数学问题求解
- **机器学习**: AutoML 和算法优化
- **代码生成**: 复杂编程任务
- **科学研究**: 实验设计和分析
了解更多: [GitHub](https://github.com/baidu-baige/LoongFlow)
""")
return demo
if __name__ == "__main__":
demo = create_demo()
demo.launch(server_name="0.0.0.0", server_port=7860)