| """Entelechy Control Console - 观察和控制数字生命的控制台""" |
|
|
| import asyncio |
| import subprocess |
| import threading |
| from pathlib import Path |
|
|
| import gradio as gr |
|
|
| from main import DigitalLife |
|
|
| |
| _life: DigitalLife | None = None |
| _life_lock = threading.Lock() |
| _loop: asyncio.AbstractEventLoop | None = None |
|
|
|
|
| def _get_life() -> DigitalLife: |
| global _life |
| if _life is None: |
| with _life_lock: |
| if _life is None: |
| _life = DigitalLife() |
| return _life |
|
|
|
|
| def _get_loop() -> asyncio.AbstractEventLoop: |
| global _loop |
| if _loop is None or _loop.is_closed(): |
| _loop = asyncio.new_event_loop() |
| return _loop |
|
|
|
|
| def _run_async(coro): |
| """Run an async coroutine from sync context.""" |
| loop = _get_loop() |
| if loop.is_running(): |
| future = asyncio.run_coroutine_threadsafe(coro, loop) |
| return future.result(timeout=300) |
| else: |
| return loop.run_until_complete(coro) |
|
|
|
|
| |
|
|
| def get_thinking_stream() -> str: |
| """获取 LLM 思维流(最近的对话历史)""" |
| life = _get_life() |
| if life.history is None or not life.history.messages: |
| return "暂无思维记录" |
|
|
| |
| recent_msgs = life.history.messages[-20:] |
| output = [] |
| for msg in recent_msgs: |
| role = msg.get("role", "unknown") |
| content = msg.get("content", "") |
|
|
| if isinstance(content, list): |
| |
| texts = [] |
| for block in content: |
| if isinstance(block, dict): |
| if block.get("type") == "text": |
| texts.append(block.get("text", "")) |
| elif block.get("type") == "tool_use": |
| name = block.get("name", "") |
| inputs = block.get("input", {}) |
| texts.append(f"[调用工具: {name}({inputs})]") |
| elif block.get("type") == "tool_result": |
| texts.append("[工具结果]") |
| content = "\n".join(texts) |
|
|
| output.append(f"**{role}**: {content}\n") |
|
|
| return "\n".join(output) |
|
|
|
|
| def list_files(path: str = ".") -> str: |
| """列出文件系统""" |
| try: |
| base_path = Path(path) |
| if not base_path.exists(): |
| return f"路径不存在: {path}" |
|
|
| if base_path.is_file(): |
| return f"这是文件,不是目录: {path}" |
|
|
| items = [] |
| for item in sorted(base_path.iterdir()): |
| item_type = "📁" if item.is_dir() else "📄" |
| items.append(f"{item_type} {item.name}") |
|
|
| return "\n".join(items) if items else "空目录" |
| except Exception as e: |
| return f"错误: {e}" |
|
|
|
|
| def read_file_content(path: str) -> str: |
| """读取文件内容""" |
| try: |
| file_path = Path(path) |
| if not file_path.exists(): |
| return f"文件不存在: {path}" |
| if not file_path.is_file(): |
| return f"这不是文件: {path}" |
|
|
| |
| content = file_path.read_text(encoding="utf-8", errors="ignore") |
| if len(content) > 10000: |
| content = content[:10000] + "\n\n... (文件过长,已截断)" |
|
|
| return content |
| except Exception as e: |
| return f"读取错误: {e}" |
|
|
|
|
| def execute_command(command: str) -> str: |
| """执行 shell 命令""" |
| try: |
| result = subprocess.run( |
| command, |
| shell=True, |
| capture_output=True, |
| text=True, |
| timeout=30, |
| encoding="utf-8", |
| errors="ignore" |
| ) |
| output = [] |
| if result.stdout: |
| output.append(f"STDOUT:\n{result.stdout}") |
| if result.stderr: |
| output.append(f"STDERR:\n{result.stderr}") |
| output.append(f"返回码: {result.returncode}") |
| return "\n".join(output) |
| except subprocess.TimeoutExpired: |
| return "命令执行超时(30秒)" |
| except Exception as e: |
| return f"执行错误: {e}" |
|
|
|
|
| def send_stimulus(stimulus_type: str, content: str) -> str: |
| """向数字生命发送外部刺激""" |
| life = _get_life() |
| try: |
| life.receive_stimulus(stimulus_type, content) |
| return f"已发送刺激 - 类型: {stimulus_type}, 内容: {content}" |
| except Exception as e: |
| return f"发送失败: {e}" |
|
|
|
|
| def browser_action(action: str, url: str = "", selector: str = "", text: str = "") -> str: |
| """浏览器控制""" |
| life = _get_life() |
| browser_client = life.browser_client |
| if browser_client is None: |
| return "浏览器客户端未初始化" |
|
|
| async def _action(): |
| if action == "navigate": |
| return await browser_client.navigate(url) |
| elif action == "click": |
| return await browser_client.click(selector) |
| elif action == "type": |
| return await browser_client.type_text(selector, text) |
| elif action == "screenshot": |
| return await browser_client.screenshot() |
| elif action == "extract": |
| return await browser_client.extract_content() |
| else: |
| return f"未知操作: {action}" |
|
|
| return _run_async(_action()) |
|
|
|
|
| def _start_background_life(): |
| """在后台线程启动数字生命循环""" |
| life = _get_life() |
|
|
| def run(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| global _loop |
| _loop = loop |
| try: |
| loop.run_until_complete(life.run_forever()) |
| except Exception as e: |
| print(f"Background life error: {e}") |
| finally: |
| loop.close() |
|
|
| thread = threading.Thread(target=run, daemon=True, name="digital-life") |
| thread.start() |
|
|
|
|
| |
|
|
| with gr.Blocks(title="Entelechy 控制台") as demo: |
| gr.Markdown("# 🧬 Entelechy - 数字生命控制台") |
| gr.Markdown("这是观察和控制数字生命的控制台,不是聊天机器人") |
|
|
| with gr.Row(): |
| |
| with gr.Column(scale=1): |
| gr.Markdown("## 🎮 控制面板") |
|
|
| |
| with gr.Tab("命令执行"): |
| cmd_input = gr.Textbox(label="Shell 命令", placeholder="例如: ls -la, git status") |
| cmd_btn = gr.Button("执行", variant="primary") |
| cmd_output = gr.Textbox(label="输出", lines=10) |
| cmd_btn.click(execute_command, inputs=cmd_input, outputs=cmd_output) |
|
|
| |
| with gr.Tab("发送刺激"): |
| stimulus_type = gr.Dropdown( |
| choices=["message", "event", "task", "reminder"], |
| label="刺激类型", |
| value="message" |
| ) |
| stimulus_content = gr.Textbox(label="刺激内容", placeholder="例如: 检查一下系统状态") |
| stimulus_btn = gr.Button("发送", variant="primary") |
| stimulus_output = gr.Textbox(label="结果") |
| stimulus_btn.click( |
| send_stimulus, |
| inputs=[stimulus_type, stimulus_content], |
| outputs=stimulus_output |
| ) |
|
|
| |
| with gr.Tab("浏览器控制"): |
| browser_action_type = gr.Dropdown( |
| choices=["navigate", "click", "type", "screenshot", "extract"], |
| label="操作", |
| value="navigate" |
| ) |
| browser_url = gr.Textbox(label="URL (navigate)", placeholder="https://...") |
| browser_selector = gr.Textbox(label="CSS 选择器 (click/type)", placeholder="#button, .input") |
| browser_text = gr.Textbox(label="输入文本 (type)", placeholder="要输入的内容") |
| browser_btn = gr.Button("执行", variant="primary") |
| browser_output = gr.Textbox(label="结果", lines=8) |
| browser_btn.click( |
| browser_action, |
| inputs=[browser_action_type, browser_url, browser_selector, browser_text], |
| outputs=browser_output |
| ) |
|
|
| |
| with gr.Tab("文件系统"): |
| file_path = gr.Textbox(label="路径", value=".", placeholder="目录或文件路径") |
| list_btn = gr.Button("列出文件", size="sm") |
| read_btn = gr.Button("读取文件", size="sm") |
| file_output = gr.Textbox(label="输出", lines=15) |
| list_btn.click(list_files, inputs=file_path, outputs=file_output) |
| read_btn.click(read_file_content, inputs=file_path, outputs=file_output) |
|
|
| |
| with gr.Column(scale=2): |
| gr.Markdown("## 🧠 LLM 思维流") |
| gr.Markdown("实时显示数字生命的思考过程和工具调用") |
| thinking_output = gr.Textbox(label="思维记录", lines=35, show_label=False) |
| refresh_btn = gr.Button("🔄 刷新思维流") |
| refresh_btn.click(get_thinking_stream, outputs=thinking_output) |
|
|
| |
| demo.load(get_thinking_stream, outputs=thinking_output) |
|
|
|
|
| if __name__ == "__main__": |
| |
| _start_background_life() |
|
|
| |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| ) |
|
|