samlax12's picture
Upload 25 files
20f7a0a verified
# modules/agent_builder/routes.py
from flask import Blueprint, request, jsonify
import os
import json
import time
import uuid
import requests
from config import STREAM_API_KEY, STREAM_BASE_URL, DEFAULT_MODEL
agent_builder_bp = Blueprint('agent_builder', __name__)
# 确保Agent存储目录存在
AGENTS_DIR = 'agents'
os.makedirs(AGENTS_DIR, exist_ok=True)
@agent_builder_bp.route('/create', methods=['POST'])
def create_agent():
"""创建新的Agent"""
try:
data = request.json
name = data.get('name')
description = data.get('description', '')
subject = data.get('subject', name) # 默认使用名称作为学科
instructor = data.get('instructor', '教师') # 默认使用教师作为指导者
plugins = data.get('plugins', [])
knowledge_bases = data.get('knowledge_bases', [])
workflow = data.get('workflow')
if not name:
return jsonify({
"success": False,
"message": "Agent名称不能为空"
}), 400
# 创建Agent ID
agent_id = f"agent_{uuid.uuid4().hex[:8]}_{int(time.time())}"
# 构建Agent配置
agent_config = {
"id": agent_id,
"name": name,
"description": description,
"subject": subject, # 添加学科
"instructor": instructor, # 添加指导者
"created_at": int(time.time()),
"plugins": plugins,
"knowledge_bases": knowledge_bases,
"workflow": workflow or {
"nodes": [],
"edges": []
},
"distributions": [],
"stats": {
"usage_count": 0,
"last_used": None
}
}
# 保存Agent配置
with open(os.path.join(AGENTS_DIR, f"{agent_id}.json"), 'w', encoding='utf-8') as f:
json.dump(agent_config, f, ensure_ascii=False, indent=2)
return jsonify({
"success": True,
"agent_id": agent_id,
"message": f"Agent '{name}' 创建成功"
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/list', methods=['GET'])
def list_agents():
"""获取所有Agent列表"""
try:
agents = []
for filename in os.listdir(AGENTS_DIR):
if filename.endswith('.json'):
with open(os.path.join(AGENTS_DIR, filename), 'r', encoding='utf-8') as f:
agent_config = json.load(f)
# 简化版本,只返回关键信息
agents.append({
"id": agent_config.get("id"),
"name": agent_config.get("name"),
"description": agent_config.get("description"),
"subject": agent_config.get("subject", agent_config.get("name")),
"instructor": agent_config.get("instructor", "教师"),
"created_at": agent_config.get("created_at"),
"plugins": agent_config.get("plugins", []),
"knowledge_bases": agent_config.get("knowledge_bases", []),
"usage_count": agent_config.get("stats", {}).get("usage_count", 0),
"distribution_count": len(agent_config.get("distributions", []))
})
# 按创建时间排序
agents.sort(key=lambda x: x.get("created_at", 0), reverse=True)
return jsonify({
"success": True,
"agents": agents
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/<agent_id>', methods=['GET'])
def get_agent(agent_id):
"""获取特定Agent的配置"""
try:
agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json")
if not os.path.exists(agent_path):
return jsonify({
"success": False,
"message": "Agent不存在"
}), 404
with open(agent_path, 'r', encoding='utf-8') as f:
agent_config = json.load(f)
return jsonify({
"success": True,
"agent": agent_config
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/<agent_id>', methods=['PUT'])
def update_agent(agent_id):
"""更新Agent配置"""
try:
data = request.json
agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json")
if not os.path.exists(agent_path):
return jsonify({
"success": False,
"message": "Agent不存在"
}), 404
# 读取现有配置
with open(agent_path, 'r', encoding='utf-8') as f:
agent_config = json.load(f)
# 更新允许的字段
if 'name' in data:
agent_config['name'] = data['name']
if 'description' in data:
agent_config['description'] = data['description']
if 'plugins' in data:
agent_config['plugins'] = data['plugins']
if 'knowledge_bases' in data:
agent_config['knowledge_bases'] = data['knowledge_bases']
if 'workflow' in data:
agent_config['workflow'] = data['workflow']
# 添加更新时间
agent_config['updated_at'] = int(time.time())
# 保存更新后的配置
with open(agent_path, 'w', encoding='utf-8') as f:
json.dump(agent_config, f, ensure_ascii=False, indent=2)
return jsonify({
"success": True,
"message": "Agent更新成功"
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/<agent_id>', methods=['DELETE'])
def delete_agent(agent_id):
"""删除Agent"""
try:
agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json")
if not os.path.exists(agent_path):
return jsonify({
"success": False,
"message": "Agent不存在"
}), 404
# 删除Agent配置文件
os.remove(agent_path)
return jsonify({
"success": True,
"message": "Agent删除成功"
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/<agent_id>/distribute', methods=['POST'])
def distribute_agent(agent_id):
"""为Agent创建分发链接"""
try:
data = request.json
expires_in = data.get('expires_in', 0) # 0表示永不过期
agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json")
if not os.path.exists(agent_path):
return jsonify({
"success": False,
"message": "Agent不存在"
}), 404
# 读取Agent配置
with open(agent_path, 'r', encoding='utf-8') as f:
agent_config = json.load(f)
# 创建访问令牌
token = uuid.uuid4().hex
# 计算过期时间
expiry = int(time.time() + expires_in) if expires_in > 0 else 0
# 创建分发记录
distribution = {
"id": f"dist_{uuid.uuid4().hex[:6]}",
"created_at": int(time.time()),
"token": token,
"expires_at": expiry,
"usage_count": 0
}
# 更新Agent配置
if "distributions" not in agent_config:
agent_config["distributions"] = []
agent_config["distributions"].append(distribution)
# 保存更新后的配置
with open(agent_path, 'w', encoding='utf-8') as f:
json.dump(agent_config, f, ensure_ascii=False, indent=2)
# 构建访问链接
access_link = f"/student/{agent_id}?token={token}"
return jsonify({
"success": True,
"distribution": {
"id": distribution["id"],
"link": access_link,
"token": token,
"expires_at": expiry
},
"message": "分发链接创建成功"
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
@agent_builder_bp.route('/ai-assist', methods=['POST'])
def ai_assisted_workflow():
"""使用AI辅助Agent工作流编排"""
try:
data = request.json
description = data.get('description', '')
subject = data.get('subject', '通用学科')
knowledge_bases = data.get('knowledge_bases', [])
plugins = data.get('plugins', [])
if not description:
return jsonify({
"success": False,
"message": "请提供Agent描述"
}), 400
# 获取所有可用的知识库
available_knowledge_bases = []
try:
# 获取知识库列表的API调用
kb_response = requests.get("https://samlax12-agent.hf.space/api/knowledge")
if kb_response.status_code == 200:
kb_data = kb_response.json()
if kb_data.get("success"):
for kb in kb_data.get("data", []):
available_knowledge_bases.append(kb["id"])
except:
# 如果API调用失败,使用默认值
pass
# 可用的插件
available_plugins = ["code", "visualization", "mindmap"]
# 构建提示
system_prompt = """你是一个专业的AI工作流设计师。你需要根据用户描述,设计一个适合教育场景的Agent工作流。
你不仅要设计工作流结构,还要推荐合适的知识库和插件。请确保工作流逻辑合理,能够满足用户的需求。
工作流应包含以下类型的节点:
1. 意图识别:识别用户输入的意图
2. 知识库查询:从指定知识库中检索信息
3. 插件调用:调用特定插件(如代码执行、3D可视化或思维导图)
4. 回复生成:生成最终回复
用户当前选择的知识库:{knowledge_bases}
系统中可用的知识库有:{available_knowledge_bases}
用户当前选择的插件:{plugins}
系统中可用的插件有:{available_plugins}
这个Agent的主题领域是:{subject}
重要:根据Agent描述,推荐最合适的知识库和插件。在推荐时,只能使用系统中真实可用的知识库和插件。
请返回三部分内容:
1. 推荐的知识库列表(只能从可用知识库中选择)
2. 推荐的插件列表(只能从可用插件中选择)
3. 完整的工作流JSON结构
JSON格式示例:
{{
"recommended_knowledge_bases": ["rag_knowledge1", "rag_knowledge2"],
"recommended_plugins": ["code", "visualization"],
"workflow": {{
"nodes": [
{{ "id": "node1", "type": "intent_recognition", "data": {{ "name": "意图识别" }} }},
{{ "id": "node2", "type": "knowledge_query", "data": {{ "name": "知识库查询", "knowledge_base_id": "rag_knowledge1" }} }},
{{ "id": "node3", "type": "plugin_call", "data": {{ "name": "调用代码执行插件", "plugin_id": "code" }} }},
{{ "id": "node4", "type": "generate_response", "data": {{ "name": "生成回复" }} }}
],
"edges": [
{{ "id": "edge1", "source": "node1", "target": "node2", "condition": "需要知识" }},
{{ "id": "edge2", "source": "node1", "target": "node3", "condition": "需要代码执行" }},
{{ "id": "edge3", "source": "node2", "target": "node4" }},
{{ "id": "edge4", "source": "node3", "target": "node4" }}
]
}}
}}
"""
system_prompt = system_prompt.format(
knowledge_bases=", ".join(knowledge_bases) if knowledge_bases else "无",
available_knowledge_bases=", ".join(available_knowledge_bases) if available_knowledge_bases else "无可用知识库",
plugins=", ".join(plugins) if plugins else "无",
available_plugins=", ".join(available_plugins),
subject=subject
)
# 使用流式API
try:
headers = {
"Authorization": f"Bearer {STREAM_API_KEY}",
"Content-Type": "application/json"
}
response = requests.post(
f"{STREAM_BASE_URL}/chat/completions",
headers=headers,
json={
"model": DEFAULT_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"请为以下描述的教育Agent设计工作流并推荐知识库和插件:\n\n{description}"}
]
}
)
if response.status_code != 200:
return jsonify({
"success": False,
"message": f"Error code: {response.status_code} - {response.text}",
"workflow": create_default_workflow(),
"recommended_knowledge_bases": [],
"recommended_plugins": []
}), 200 # 返回200但包含错误信息和备用工作流
result = response.json()
content = result['choices'][0]['message']['content']
except Exception as api_error:
return jsonify({
"success": False,
"message": f"无法连接到AI模型服务: {str(api_error)}",
"workflow": create_default_workflow(),
"recommended_knowledge_bases": [],
"recommended_plugins": []
}), 200
# 查找JSON部分
import re
json_match = re.search(r'```json\n([\s\S]*?)\n```', content)
if json_match:
workflow_json = json_match.group(1)
else:
# 尝试直接解析整个内容
workflow_json = content
# 解析JSON
try:
result_data = json.loads(workflow_json)
except:
# 如果解析失败,使用正则表达式清理
workflow_json = re.sub(r'```json\n|\n```', '', content)
try:
result_data = json.loads(workflow_json)
except:
# 如果仍然解析失败,提取标准的JSON结构 { ... }
import re
json_patterns = re.findall(r'\{[\s\S]*?\}', content)
if json_patterns:
try:
# 尝试解析最长的JSON结构
longest_json = max(json_patterns, key=len)
result_data = json.loads(longest_json)
except:
# 仍然失败,返回默认值
return jsonify({
"success": True,
"message": "使用默认工作流(AI生成的JSON无效)",
"workflow": create_default_workflow(),
"recommended_knowledge_bases": [],
"recommended_plugins": []
})
else:
# 没有找到有效的JSON结构
return jsonify({
"success": True,
"message": "使用默认工作流(未找到JSON结构)",
"workflow": create_default_workflow(),
"recommended_knowledge_bases": [],
"recommended_plugins": []
})
# 提取推荐的知识库和插件
recommended_knowledge_bases = result_data.get("recommended_knowledge_bases", [])
recommended_plugins = result_data.get("recommended_plugins", [])
workflow = result_data.get("workflow", create_default_workflow())
# 验证推荐的知识库都存在
valid_knowledge_bases = []
for kb in recommended_knowledge_bases:
if kb in available_knowledge_bases:
valid_knowledge_bases.append(kb)
# 验证推荐的插件都存在
valid_plugins = []
for plugin in recommended_plugins:
if plugin in available_plugins:
valid_plugins.append(plugin)
return jsonify({
"success": True,
"workflow": workflow,
"recommended_knowledge_bases": valid_knowledge_bases,
"recommended_plugins": valid_plugins,
"message": "已成功创建工作流"
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"message": str(e)
}), 500
def create_default_workflow():
"""创建一个默认的空工作流"""
return {
"nodes": [],
"edges": []
}