| |
| |
| """ |
| 修复说明: |
| 1. 核心问题: 容器环境下缺乏对/app/runs目录的写入权限(PermissionError: [Errno 13]) |
| 2. 解决方案: |
| - 实现更健壮的存储路径选择机制 |
| - 增加权限验证和自动回退逻辑 |
| - 使用临时目录作为最终回退路径 |
| - 修复路径处理中的绝对/相对路径问题 |
| - 增强错误处理和日志记录 |
| 3. 主要修改点: |
| - _ensure_dir(): 增加权限验证 |
| - pick_storage_root(): 增强路径选择逻辑 |
| - 目录初始化: 使用绝对路径并验证可写性 |
| - run_in_sandbox(): 使用更安全的临时目录 |
| - 增加多级回退机制(从/data → ~/.cache → /tmp) |
| """ |
| try: |
| DEBUG_BUFFER |
| except NameError: |
| DEBUG_BUFFER = [] |
|
|
| |
| from datetime import datetime |
|
|
| def _sanitize(o): |
| return o |
|
|
| def debug(level, msg, details=None): |
| try: |
| ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] |
| except Exception: |
| ts = "time" |
| line = f"[{ts}] [{str(level).upper():<8}] {msg}" |
| try: |
| print(line) |
| if details is not None: |
| print(details) |
| except Exception: |
| pass |
| try: |
| DEBUG_BUFFER.append(line) |
| if len(DEBUG_BUFFER) > 1000: |
| del DEBUG_BUFFER[: len(DEBUG_BUFFER) - 1000] |
| except Exception: |
| pass |
|
|
| def get_debug_text() -> str: |
| try: |
| return "\n".join(DEBUG_BUFFER[-2000:]) |
| except Exception: |
| return "" |
| |
| |
| |
|
|
| import gradio as gr |
| import requests, os, re, subprocess, json, sys, html, time, shutil, zipfile, shlex, threading, hashlib, inspect, traceback, urllib.parse |
| from pathlib import Path |
| from datetime import datetime |
| import tempfile |
| import shutil |
| import concurrent.futures |
| from typing import List, Dict, Any, Tuple, Optional |
| from dataclasses import dataclass, field |
| import ast, difflib |
| from collections import deque |
|
|
| APP_NAME = ( |
| "AI 自主编程修复代理(团队规划 + 持久化 + 实时日志 + 选择修复 + 审计 + 智能补丁)" |
| ) |
|
|
|
|
| |
| def _ensure_dir(d: str) -> bool: |
| try: |
| path = Path(d) |
| if not path.is_absolute(): |
| path = (Path.cwd() / path).resolve() |
| path.mkdir(parents=True, exist_ok=True) |
| |
| t = path / ".wtest" |
| t.write_text("ok", encoding="utf-8") |
| try: |
| t.unlink() |
| except Exception: |
| pass |
| |
| if not os.access(str(path), os.R_OK) or not os.access(str(path), os.W_OK): |
| return False |
| return True |
| except Exception as e: |
| try: |
| debug("WARN", f"目录验证失败: {d}", {"error": str(e), "path": str(d)}) |
| except Exception: |
| print(f"[WARN] 目录验证失败: {d} -> {e}") |
| return False |
|
|
|
|
| def pick_storage_root() -> str: |
| candidates = [ |
| os.getenv("SPACE_STORAGE", "") or "", |
| "/data", |
| str(Path.home() / ".cache" / "pyzdh"), |
| str(Path.cwd() / "data"), |
| str(Path.cwd()), |
| ] |
| seen = set() |
| for candidate in candidates: |
| if not candidate: |
| continue |
| try: |
| norm_path = str(Path(candidate).resolve()) |
| except Exception: |
| norm_path = candidate |
| if norm_path in seen: |
| continue |
| seen.add(norm_path) |
| if _ensure_dir(norm_path): |
| try: |
| debug("INFO", f"选择存储根目录: {norm_path}") |
| except Exception: |
| pass |
| return norm_path |
| fallback = str(Path(tempfile.gettempdir()) / "pyzdh_storage") |
| _ensure_dir(fallback) |
| try: |
| debug("WARN", f"回退到临时目录: {fallback}") |
| except Exception: |
| pass |
| return fallback |
|
|
|
|
| |
| STORAGE_ROOT = pick_storage_root() |
| storage_path = Path(STORAGE_ROOT) |
|
|
| |
| RUN_ROOT = str((storage_path / "runs").resolve()) |
| LOGS_DIR = str((storage_path / "logs").resolve()) |
| PROJECT_ROOT = str((storage_path / "projects").resolve()) |
| TOOLS_ROOT = str((storage_path / "mcp_tools").resolve()) |
| UPLOADS_DIR = str((storage_path / "uploads").resolve()) |
| TASKS_FILE = str((storage_path / "tasks.json").resolve()) |
| KEYS_FILE = str((storage_path / "keys.json").resolve()) |
|
|
| |
| essential_dirs = [ |
| ("runs", RUN_ROOT), |
| ("logs", LOGS_DIR), |
| ("projects", PROJECT_ROOT), |
| ("mcp_tools", TOOLS_ROOT), |
| ("uploads", UPLOADS_DIR) |
| ] |
|
|
| for dir_name, dir_path in essential_dirs: |
| if not _ensure_dir(dir_path): |
| |
| fallback_path = str(Path(tempfile.gettempdir()) / "pyzdh" / dir_name) |
| debug("WARN", f"目录{dir_path}不可用,回退到{fallback_path}") |
| _ensure_dir(fallback_path) |
|
|
| |
| if dir_name == "runs": |
| RUN_ROOT = fallback_path |
| elif dir_name == "logs": |
| LOGS_DIR = fallback_path |
| elif dir_name == "projects": |
| PROJECT_ROOT = fallback_path |
| elif dir_name == "mcp_tools": |
| TOOLS_ROOT = fallback_path |
| elif dir_name == "uploads": |
| UPLOADS_DIR = fallback_path |
|
|
| debug("INFO", "存储目录初始化完成", { |
| "storage_root": STORAGE_ROOT, |
| "runs": RUN_ROOT, |
| "logs": LOGS_DIR, |
| "projects": PROJECT_ROOT, |
| "uploads": UPLOADS_DIR |
| }) |
|
|
| |
| try: |
| os.environ["GIT_PYTHON_REFRESH"] = "quiet" |
| from git import Repo, GitCommandError |
|
|
| GIT_AVAILABLE = True |
| except Exception as e: |
| print(f"[WARN] GitPython unavailable: {e}") |
| GIT_AVAILABLE = False |
|
|
| class Repo: ... |
|
|
| class GitCommandError(Exception): ... |
|
|
|
|
| |
| HF_API_URL = "https://api-inference.huggingface.co/models/" |
| DEFAULT_BASES = { |
| "openai": "https://api.openai.com/v1", |
| "groq": "https://api.groq.com/openai/v1", |
| "mistral": "https://api.mistral.ai/v1", |
| "deepseek": "https://api.deepseek.com/v1", |
| "openrouter": "https://openrouter.ai/api/v1", |
| "perplexity": "https://api.perplexity.ai/v1", |
| "xai": "https://api.x.ai/v1", |
| "azure": "https://YOUR-RESOURCE.openai.azure.com", |
| "anthropic": "https://api.anthropic.com/v1", |
| "siliconflow": "https://api.siliconflow.cn/v1", |
| } |
| RECOMMENDED_MODELS = { |
| "gemini": ["gemini-1.5-flash", "gemini-1.5-pro"], |
| "openai": ["gpt-4o-mini", "gpt-4o"], |
| "anthropic": ["claude-3-5-sonnet-20240620", "claude-3-haiku-20240307"], |
| |
| "groq": ["llama-3.1-70b-versatile"], |
| "mistral": ["mistral-large-latest"], |
| "deepseek": ["deepseek-chat"], |
| "openrouter": ["openai/gpt-4o"], |
| "hf": ["Qwen/Qwen1.5-7B-Chat"], |
| "azure": ["<你的部署名>"], |
| "perplexity": ["llama-3-sonar-large-32k-online"], |
| "xai": ["grok-1"], |
| "mock": ["mock-echo"], |
| "siliconflow": ["deepseek-ai/DeepSeek-V2", "alibaba/Qwen2-7B-Instruct"], |
| } |
| OPENAI_LIKE = { |
| "openai", |
| "groq", |
| "mistral", |
| "deepseek", |
| "openrouter", |
| "perplexity", |
| "xai", |
| "siliconflow", |
| } |
| MODEL_OK_CACHE: Dict[str, bool] = {} |
| DEFAULT_GLOBAL_HINT_STR = "你是一个自动化编程维修测试员,根据要求自动运行测试修复以下代码,为正常没有问题所有功能完整实现" |
| DEFAULT_REQ_TIMEOUT = int(os.getenv("REQ_TIMEOUT", "1200")) |
|
|
| |
| DEBUG_BUFFER: List[str] = [] |
|
|
|
|
| def _sanitize(o): |
| try: |
| if isinstance(o, dict): |
| d = {} |
| for k, v in o.items(): |
| if any(s in k.lower() for s in ["key", "token", "secret", "auth"]): |
| d[k] = ( |
| (str(v)[:4] + "..." + str(v)[-4:]) |
| if isinstance(v, str) and len(str(v)) > 8 |
| else "***" |
| ) |
| else: |
| d[k] = _sanitize(v) |
| return d |
| if isinstance(o, list): |
| return [_sanitize(x) for x in o] |
| return o |
| except Exception: |
| return o |
|
|
|
|
| def debug(level, msg, details=None): |
| try: |
| fr = inspect.stack()[1] |
| loc = f"{os.path.basename(fr.filename)}:{fr.lineno}({fr.function})" |
| except Exception: |
| loc = "unknown" |
|
|
| ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] |
| line = f"[{ts}] [{level.upper():<8}] [{loc}] {msg}" |
|
|
| if details is not None: |
| try: |
| d = _sanitize(details) |
| line += os.linesep + (json.dumps(d, ensure_ascii=False, indent=2) if isinstance(d, (dict, list)) else str(d)) |
| except Exception: |
| line += os.linesep + str(details) |
|
|
| try: |
| Path(LOGS_DIR).mkdir(parents=True, exist_ok=True) |
| log_file = Path(LOGS_DIR) / f"debug-{datetime.now().strftime('%Y%m%d')}.log" |
| with open(log_file, "a", encoding="utf-8") as f: |
| f.write(line + os.linesep + "=" * 80 + os.linesep) |
| except Exception as e: |
| print(f"DEBUG_LOG_ERROR: {e}", file=sys.stderr) |
| print(line, file=sys.stderr) |
|
|
| DEBUG_BUFFER.append(line) |
| if len(DEBUG_BUFFER) > 4000: |
| del DEBUG_BUFFER[: len(DEBUG_BUFFER) - 4000] |
|
|
| print(line) |
|
|
|
|
| def get_debug_text() -> str: |
| return "\n".join(DEBUG_BUFFER[-2000:]) |
|
|
|
|
| |
| AUTO_SAVE_ENABLED = True |
| _LAST_SAVE_TS = 0.0 |
|
|
|
|
| def load_all() -> dict: |
| try: |
| if Path(KEYS_FILE).exists(): |
| txt = Path(KEYS_FILE).read_text("utf-8") |
| debug("CONF", "载入 keys.json", {"path": KEYS_FILE, "bytes": len(txt)}) |
| return json.loads(txt) |
| except Exception as e: |
| debug( |
| "CONF_ERR", |
| "读取 keys.json 失败", |
| {"err": str(e), "trace": traceback.format_exc()}, |
| ) |
| return {} |
|
|
|
|
| def _merge_non_empty(old: dict, new: dict) -> dict: |
| out = dict(old or {}) |
| for k, v in (new or {}).items(): |
| if v is None: |
| continue |
| if isinstance(v, str) and v.strip() == "": |
| continue |
| out[k] = v |
| return out |
|
|
|
|
| def save_json(obj: dict, auto=False) -> str: |
| try: |
| tmp = KEYS_FILE + ".tmp" |
| s = json.dumps(obj, ensure_ascii=False, indent=2) |
| Path(tmp).write_text(s, "utf-8") |
| os.replace(tmp, KEYS_FILE) |
| debug( |
| "CONF", "写入 keys.json", {"path": KEYS_FILE, "bytes": len(s), "auto": auto} |
| ) |
| return f"✅ {'自动保存成功' if auto else '已保存到本地'}({datetime.now().strftime('%H:%M:%S')})" |
| except Exception as e: |
| debug( |
| "CONF_ERR", |
| "写入 keys.json 失败", |
| {"err": str(e), "trace": traceback.format_exc()}, |
| ) |
| return f"❌ 保存失败:{e}" |
|
|
|
|
| def get_custom_providers() -> Dict[str, Dict[str, Any]]: |
| cps = load_all().get("custom_providers", {}) |
| return cps if isinstance(cps, dict) else {} |
|
|
|
|
| def provider_choices() -> List[str]: |
| return list(RECOMMENDED_MODELS.keys()) + list(get_custom_providers().keys()) |
|
|
|
|
| def save_custom_provider( |
| name: str, base: str, key: str, referer: str = "", title: str = "" |
| ) -> str: |
| name, base, key = (name or "").strip(), (base or "").strip(), (key or "").strip() |
| debug( |
| "UI_ACTION", |
| "尝试保存自定义API", |
| {"name": name, "base": base, "referer": referer, "title": title}, |
| ) |
| if not name: |
| return "❗ 请输入自定义提供商 ID" |
| if not base or not key: |
| return "❗ Base URL 和 API Key 不能为空" |
| try: |
| data = load_all() |
| cps = data.get("custom_providers", {}) or {} |
| if not isinstance(cps, dict): |
| cps = {} |
| cps[name] = { |
| "base": base, |
| "key": key, |
| "referer": (referer or ""), |
| "title": (title or ""), |
| } |
| data["custom_providers"] = cps |
| return save_json(data, auto=True) |
| except Exception as e: |
| debug( |
| "EXCEPTION", |
| "保存自定义API异常", |
| {"err": str(e), "trace": traceback.format_exc()}, |
| ) |
| return f"❌ 内部错误,保存失败: {e}" |
|
|
|
|
| def remove_custom_provider(name: str) -> str: |
| data = load_all() |
| cps = data.get("custom_providers", {}) or {} |
| if name in cps: |
| cps.pop(name, None) |
| data["custom_providers"] = cps |
| return save_json(data, auto=True) |
| return "❌ 未找到该自定义提供商" |
|
|
|
|
| def save_all_ui( |
| keys: dict, |
| provider: str, |
| model: str, |
| github_token: str, |
| global_hint: str = "", |
| last_task: Optional[str] = None, |
| auto=False, |
| ) -> str: |
| old = load_all() |
| merged = _merge_non_empty(old, keys or {}) |
| merged["ui_provider"] = provider or old.get("ui_provider", "") |
| merged["ui_model"] = model or old.get("ui_model", "") |
| if github_token: |
| merged["github_token"] = github_token |
| merged["global_hint"] = global_hint or old.get("global_hint", "") |
| if last_task is not None: |
| lt = last_task.strip() |
| merged["last_task"] = lt[:4000] + "…(截断)" if len(lt) > 4000 else lt |
| debug( |
| "CONF", |
| "保存UI与键值", |
| {"provider": merged.get("ui_provider"), "model": merged.get("ui_model")}, |
| ) |
| return save_json(merged, auto=auto) |
|
|
|
|
| def auto_save_ui( |
| keys: dict, |
| provider: str, |
| model: str, |
| github_token: str, |
| global_hint: str = "", |
| last_task: str = "", |
| ): |
| global _LAST_SAVE_TS |
| if not AUTO_SAVE_ENABLED: |
| return "" |
| now = time.time() |
| if now - _LAST_SAVE_TS >= 1.2: |
| _LAST_SAVE_TS = now |
| return save_all_ui( |
| keys, |
| provider, |
| model, |
| github_token, |
| global_hint, |
| last_task=last_task, |
| auto=True, |
| ) |
| return "" |
|
|
|
|
| def get_models_cache(provider: str) -> List[str]: |
| data = load_all() |
| cache = data.get("models_cache", {}).get(provider, []) |
| if isinstance(cache, list): |
| debug("MODELS", "读取模型缓存", {"provider": provider, "count": len(cache)}) |
| return cache |
| return [] |
|
|
|
|
| def set_models_cache(provider: str, models: List[str]): |
| data = load_all() |
| mc = data.get("models_cache", {}) |
| mc[provider] = models or [] |
| data["models_cache"] = mc |
| debug("MODELS", "写入模型缓存", {"provider": provider, "count": len(models or [])}) |
| save_json(data, auto=True) |
|
|
|
|
| def get_saved_rpm(provider: str, model: str) -> int: |
| data = load_all() |
| return int(data.get("rate_limits", {}).get(f"{provider}:{model}", 0) or 0) |
|
|
|
|
| def set_saved_rpm(provider: str, model: str, rpm: int): |
| data = load_all() |
| rl = data.get("rate_limits", {}) |
| rl[f"{provider}:{model}"] = int(max(0, rpm)) |
| data["rate_limits"] = rl |
| debug("RL", "保存RPM限制", {"key": f"{provider}:{model}", "rpm": int(max(0, rpm))}) |
| save_json(data, auto=True) |
| |
| |
| def load_tasks_db() -> dict: |
| try: |
| if Path(TASKS_FILE).exists(): |
| txt = Path(TASKS_FILE).read_text("utf-8") |
| debug("TASK", "加载任务库", {"path": TASKS_FILE, "bytes": len(txt)}) |
| return json.loads(txt) |
| except Exception as e: |
| debug( |
| "TASK_ERR", |
| "读取任务库失败", |
| {"err": str(e), "trace": traceback.format_exc()}, |
| ) |
| return {"history": [], "last": {}} |
|
|
|
|
| def save_tasks_db(db: dict): |
| try: |
| tmp = TASKS_FILE + ".tmp" |
| s = json.dumps(db, ensure_ascii=False, indent=2) |
| Path(tmp).write_text(s, "utf-8") |
| os.replace(tmp, TASKS_FILE) |
| debug("TASK", "写入任务库", {"path": TASKS_FILE, "bytes": len(s)}) |
| return True |
| except Exception as e: |
| debug( |
| "TASK_ERR", |
| "写入任务库失败", |
| {"err": str(e), "trace": traceback.format_exc()}, |
| ) |
| return False |
|
|
|
|
| def save_task_state( |
| task: str, |
| files: List[str], |
| baseline_code: str, |
| req: str, |
| exp: str, |
| cli: str, |
| provider: str, |
| model: str, |
| add_history=False, |
| ) -> str: |
| db = load_tasks_db() |
| now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| payload = { |
| "task": task or "", |
| "files": files or [], |
| "baseline_code": baseline_code or "", |
| "required_kws": req or "", |
| "expected_stdout": exp or "", |
| "cli_args": cli or "", |
| "provider": provider or "", |
| "model": model or "", |
| "time": now, |
| "chat_history": db.get("last", {}).get("chat_history", ""), |
| "chat_history_general": db.get("last", {}).get("chat_history_general", ""), |
| } |
| db["last"] = payload |
| if add_history: |
| hist = db.get("history", []) |
| hist.insert(0, payload) |
| db["history"] = hist[:20] |
| save_tasks_db(db) |
| debug("TASK", "保存任务快照", {"time": now, "files": len(files or [])}) |
| return f"✅ 任务已保存({now})" |
|
|
|
|
| def persist_chat_history(hist: str) -> str: |
| db = load_tasks_db() |
| last = db.get("last", {}) |
| last["chat_history"] = hist or "" |
| db["last"] = last |
| save_tasks_db(db) |
| debug("CHAT", "保存对话历史(编程)", {"chars": len(hist or "")}) |
| return "✔️ 对话已保存(编程)" |
|
|
|
|
| def persist_general_chat_history(hist: str) -> str: |
| db = load_tasks_db() |
| last = db.get("last", {}) |
| last["chat_history_general"] = hist or "" |
| db["last"] = last |
| save_tasks_db(db) |
| debug("CHAT", "保存对话历史(智能)", {"chars": len(hist or "")}) |
| return "✔️ 对话已保存(智能)" |
|
|
|
|
| def restore_last_task_state() -> dict: |
| return load_tasks_db().get("last", {}) |
|
|
|
|
| def get_task_history_titles() -> List[str]: |
| db = load_tasks_db() |
| titles = [] |
| for i, item in enumerate(db.get("history", [])): |
| t = item.get("time", "?") |
| goal = (item.get("task", "") or "").strip() |
| goal = (goal[:30] + ("…" if len(goal) > 30 else "")) if goal else "(空任务)" |
| titles.append(f"{i+1:02d} | {t} | {goal}") |
| return titles |
|
|
|
|
| def get_task_history_by_index(idx: int) -> dict: |
| db = load_tasks_db() |
| hist = db.get("history", []) |
| return hist[idx] if 0 <= idx < len(hist) else {} |
|
|
|
|
| |
| TEXT_EXTS = { |
| ".py", ".txt", ".json", ".csv", ".yml", ".yaml", ".ini", ".conf", ".md", ".xml", |
| ".html", ".css", ".js", ".ts", ".tsx", ".jsx", ".toml", ".cfg", ".log", ".ipynb", |
| ".java", ".kt", ".go", ".rs", ".c", ".h", ".cpp", ".hpp", ".cs", ".php", ".rb", |
| ".sh", ".ps1", ".r", ".m", ".scala", ".pl", ".erl", ".ex", ".exs", ".dart", |
| ".swift", ".sql", |
| } |
|
|
|
|
| def resolve_file_path(f) -> str: |
| try: |
| if not f: |
| return "" |
| if isinstance(f, (str, Path)): |
| return str(f) |
| if isinstance(f, dict): |
| for k in ("name", "path"): |
| v = f.get(k) |
| if isinstance(v, str) and v: |
| return v |
| return "" |
| for attr in ("name", "path", "orig_name", "tmp_path"): |
| v = getattr(f, attr, None) |
| if isinstance(v, str) and v: |
| return v |
| fo = getattr(f, "file", None) or getattr(f, "tempfile", None) |
| if fo is not None: |
| fn = getattr(fo, "name", "") |
| if isinstance(fn, str) and fn: |
| return fn |
| except Exception: |
| return "" |
| return "" |
|
|
|
|
| def looks_text(path: str, max_probe=4096) -> bool: |
| try: |
| b = Path(path).read_bytes()[:max_probe] |
| if not b: |
| return True |
| text_chars = bytes(range(32, 127)) + b"\n\r\t\f\b" |
| printable = sum(1 for ch in b if ch in text_chars) |
| return printable / max(1, len(b)) > 0.85 |
| except Exception: |
| return False |
|
|
|
|
| def persist_files_to_uploads(files: List[str]) -> List[str]: |
| global UPLOADS_DIR |
|
|
| debug("ENTRY", "开始持久化文件", {"input_files_count": len(files or [])}) |
| saved = [] |
|
|
| upload_path = Path(UPLOADS_DIR) |
| if not _ensure_dir(str(upload_path)): |
| upload_path = Path(tempfile.gettempdir()) / "pyzdh_uploads" |
| try: |
| upload_path.mkdir(parents=True, exist_ok=True) |
| except Exception: |
| pass |
| debug("WARN", f"上传目录不可写,回退到: {upload_path}") |
| UPLOADS_DIR = str(upload_path) |
|
|
| try: |
| upload_path.mkdir(parents=True, exist_ok=True) |
| except Exception: |
| pass |
|
|
| for f in files or []: |
| if not f: |
| continue |
|
|
| rp = str(f) |
| if not os.path.isfile(rp): |
| debug("SKIP", "路径非文件", {"path": rp}) |
| continue |
|
|
| try: |
| bn = Path(rp).name |
| if re.match(r"^\d{10}-[0-9a-f]{10}-", bn): |
| persisted = upload_path / bn |
| if persisted.exists(): |
| saved.append(str(persisted)) |
| debug("SKIP", "文件已持久化", {"path": str(persisted)}) |
| continue |
| except Exception: |
| pass |
|
|
| try: |
| if str(Path(rp).resolve()).startswith(str(upload_path.resolve())): |
| saved.append(rp) |
| debug("SKIP", "文件已在上传目录", {"path": rp}) |
| continue |
|
|
| original_basename = Path(rp).name |
| ts = int(time.time()) |
| h = hashlib.sha1((rp + str(Path(rp).stat().st_mtime)).encode("utf-8")).hexdigest()[:10] |
| dest = str(upload_path / f"{ts}-{h}-{original_basename}") |
|
|
| shutil.copy2(rp, dest) |
| saved.append(dest) |
| debug("SUCCESS", "成功持久化文件", {"src": rp, "dest": dest}) |
|
|
| except Exception as e: |
| debug("EXCEPTION", "持久化文件失败", { |
| "src": rp, |
| "error": str(e), |
| "traceback": traceback.format_exc() |
| }) |
|
|
| debug("EXIT", "完成文件持久化", { |
| "saved_count": len(saved), |
| "upload_dir": UPLOADS_DIR |
| }) |
| return saved |
|
|
|
|
| def build_attachments_preview( |
| paths: List[str], per_file_chars=1200, max_files=5, max_total_chars=8000 |
| ) -> str: |
| previews, total, cnt = [], 0, 0 |
| for p in paths or []: |
| if not p or not os.path.exists(p): |
| continue |
| name = Path(p).name |
| is_text = Path(p).suffix.lower() in TEXT_EXTS or looks_text(p) |
| if not is_text: |
| previews.append(f"- {name}(非文本文件,路径:{p})") |
| continue |
| try: |
| raw = Path(p).read_text("utf-8", errors="replace") |
| except Exception: |
| try: |
| raw = Path(p).read_text("latin-1", errors="replace") |
| except Exception: |
| raw = "" |
| chunk = raw[:per_file_chars] |
| if not chunk.strip(): |
| previews.append(f"- {name}(读取为空)") |
| continue |
| piece = f"文件: {name}\n```\n{chunk}\n```" |
| if total + len(piece) > max_total_chars: |
| previews.append(f"- {name}(内容过长,已略)") |
| continue |
| previews.append(piece) |
| total += len(piece) |
| cnt += 1 |
| if cnt >= max_files: |
| break |
| return ("以下为已上传附件的内容预览(截断展示,仅供参考):\n" + "\n\n".join(previews) if previews else "") |
|
|
|
|
| |
| STOP_FLAG = False |
| CURRENT_PROCS: List[subprocess.Popen] = [] |
| WATCH_STOP = False |
| CURRENT_RUN_DIR = "" |
|
|
|
|
| def _register_proc(p: subprocess.Popen | None): |
| if p: |
| CURRENT_PROCS.append(p) |
|
|
|
|
| def stop_all(): |
| global STOP_FLAG |
| STOP_FLAG = True |
| for p in list(CURRENT_PROCS): |
| try: |
| if p and p.poll() is None: |
| p.terminate() |
| time.sleep(0.1) |
| if p.poll() is None: |
| p.kill() |
| except Exception as e: |
| try: |
| debug("WARN", "停止进程失败", {"error": str(e)}) |
| except Exception: |
| pass |
| CURRENT_PROCS.clear() |
| try: |
| debug("WARN", "所有进程已停止") |
| except Exception: |
| pass |
| return "⏹️ 已停止所有进程" |
|
|
|
|
| def read_tail(path: str, max_bytes=800_000) -> str: |
| try: |
| p = Path(path) |
| if not p.exists(): |
| return "" |
| b = p.read_bytes() |
| if len(b) > max_bytes: |
| b = b[-max_bytes:] |
| return b.decode("utf-8", "replace") |
| except Exception: |
| return "" |
|
|
|
|
| |
| @dataclass |
| class CodeFingerprint: |
| imports: List[str] = field(default_factory=list) |
| classes: List[str] = field(default_factory=list) |
| functions: List[str] = field(default_factory=list) |
| domain_keywords: List[str] = field(default_factory=list) |
|
|
|
|
| TRADING_KWS = ["order", "trade", "price", "buy", "sell", "position", "market", "exchange", "okx", "binance", "leverage", "symbol", "instrument", "algo"] |
| API_KWS = ["request", "http", "api", "endpoint", "token", "headers", "payload"] |
| ML_KWS = ["train", "predict", "model", "fit", "dataset"] |
|
|
|
|
| def extract_fingerprint(code: str) -> CodeFingerprint: |
| fp = CodeFingerprint() |
| try: |
| tree = ast.parse(code) |
| for n in ast.walk(tree): |
| if isinstance(n, ast.Import): |
| for a in n.names: |
| fp.imports.append(a.name.split(".")[0]) |
| elif isinstance(n, ast.ImportFrom): |
| if n.module: |
| fp.imports.append(n.module.split(".")[0]) |
| elif isinstance(n, ast.ClassDef): |
| fp.classes.append(n.name) |
| elif isinstance(n, ast.FunctionDef): |
| fp.functions.append(n.name) |
| except Exception: |
| pass |
| low = (code or "").lower() |
| domain = [] |
| for kw in TRADING_KWS: |
| if kw in low: |
| domain.append(kw) |
| for kw in API_KWS: |
| if kw in low and kw not in domain: |
| domain.append(kw) |
| for kw in ML_KWS: |
| if kw in low and kw not in domain: |
| domain.append(kw) |
| fp.domain_keywords = domain[:20] |
| fp.imports = sorted(set(fp.imports))[:30] |
| fp.classes = sorted(set(fp.classes))[:50] |
| fp.functions = sorted(set(fp.functions))[:120] |
| return fp |
|
|
|
|
| def domain_name(fp: CodeFingerprint) -> str: |
| if any(k in fp.domain_keywords for k in TRADING_KWS): |
| return "交易/金融" |
| if any(k in fp.domain_keywords for k in API_KWS): |
| return "API/网络" |
| if any(k in fp.domain_keywords for k in ML_KWS): |
| return "机器学习" |
| return "通用" |
|
|
|
|
| def relevance_check(orig: CodeFingerprint, new: CodeFingerprint) -> Tuple[bool, str]: |
| if not any([orig.imports, orig.classes, orig.functions, orig.domain_keywords]): |
| return True, "无基准,不做相关性约束" |
| score, total = 0, 3 |
| if orig.imports: |
| if len(set(orig.imports) & set(new.imports)) >= max(1, len(orig.imports) // 3): |
| score += 1 |
| else: |
| score += 1 |
| kept_main = any(c in new.classes for c in orig.classes) or any(f in new.functions for f in orig.functions[:10]) |
| if kept_main: |
| score += 1 |
| if set(orig.domain_keywords) & set(new.domain_keywords): |
| score += 1 |
| return score >= 2, f"相关性评分 {score}/{total}" |
|
|
|
|
| def compile_syntax_ok(code: str) -> Tuple[bool, str]: |
| try: |
| compile(code, "<main.py>", "exec") |
| return True, "" |
| except SyntaxError as e: |
| return False, f"SyntaxError: {e}" |
| except Exception: |
| return True, "" |
|
|
|
|
| def detect_markdown_fence(s: str) -> bool: |
| return "```" in (s or "") |
|
|
|
|
| def strip_code_fences(s: str) -> str: |
| """修复: 正确处理代码围栏提取""" |
| if not s: |
| return s |
| |
| m = re.findall(r"```(?:python|py)?\s*\n(.*?)```", s, re.S | re.I) |
| if m: |
| |
| return m[0].strip() if m else s.strip() |
| |
| m2 = re.findall(r"```\s*\n(.*?)```", s, re.S) |
| if m2: |
| return m2[0].strip() if m2 else s.strip() |
| return s.strip() |
|
|
|
|
| def parse_requirements(code: str) -> List[str]: |
| first = (code or "").splitlines()[:1] |
| if not first: |
| return [] |
| m = re.match(r"^\s*#\s*requirements\s*:\s*(.+)$", first[0], re.I) |
| if not m: |
| return [] |
| raw = [p.strip() for p in re.split(r"[,\s]+", m.group(1)) if p.strip()] |
| stdlibs = { |
| "re", "json", "sys", "os", "time", "pathlib", "subprocess", "tempfile", |
| "math", "typing", "datetime", "zipfile", "shutil", "hashlib", "hmac", |
| "base64", "itertools", "ast", "html", "shlex", "threading", |
| } |
| return [p for p in raw if p.split("==")[0] not in stdlibs][:40] |
|
|
|
|
| def build_initial_plan(goal: str, fp: CodeFingerprint) -> List[str]: |
| return [ |
| "最小修复优先:先清理格式错误(如 ``` 标记、缩进、语法)", |
| "保持原有结构与领域:不得偏离领域:" + domain_name(fp), |
| "仅修改与报错直接相关的行,避免大规模重写", |
| "必要时补充缺失依赖;若需 CLI 参数,提供默认值", |
| ] |
|
|
|
|
| def build_error_analysis_prompt(error: str, code: str) -> str: |
| return f""" |
| 分析以下错误,按优先级严格执行: |
| 1) 是否包含 Markdown 代码块标记(```python)混入代码?若是,仅移除标记。 |
| 2) 是否为编码/缩进/语法格式问题? |
| 3) 是否缺少必要命令行参数?如是,为参数提供默认值或交互输入。 |
| 4) 才考虑逻辑问题(保持原功能不变,最小修改)。 |
| |
| 错误: |
| {error} |
| |
| 当前代码(前120字): |
| {(code or '')[:120]} |
| """ |
|
|
|
|
| def build_fix_prompt( |
| goal: str, |
| original_fp: CodeFingerprint, |
| last_err: str, |
| current_code: str, |
| attempt: int, |
| required_keywords: List[str], |
| hint: str = "", |
| ) -> str: |
| strategy = ( |
| "最小修改;清理格式/语法;保留原有导入/类/函数;提供缺省参数;" |
| if attempt == 1 |
| else ( |
| "检查依赖与版本;为必需参数设置默认;必要时加 try/except;" |
| if attempt == 2 |
| else "深度修复,但不得改变核心功能/领域;如需重构,解释重构理由(但最终仅输出代码)。" |
| ) |
| ) |
| req_kw = ( |
| ("必须保留/包含以下关键词或结构:" + ", ".join(required_keywords) + "。") |
| if required_keywords |
| else "" |
| ) |
| hint2 = (hint.strip() + "\n") if hint.strip() else "" |
| return f""" |
| {hint2} |
| 任务目标(保持不变):{goal} |
| 领域约束:{domain_name(original_fp)}。不得将程序改为与领域无关的示例代码。 |
| 修复策略(第{attempt}次):{strategy} {req_kw} |
| |
| 若之前报错: |
| {last_err} |
| |
| 请输出完整可运行的 main.py(只要代码,不要解释)。 |
| 若需第三方库,在首行写:# requirements: 包1, 包2 |
| """ |
|
|
|
|
| def build_reanchor_prompt(goal: str, original_code: str, warning: str, hint: str = "") -> str: |
| hint2 = (hint.strip() + "\n") if hint.strip() else "" |
| return f""" |
| {hint2} |
| 警告:你生成的代码偏离了原始任务/领域({warning})。 |
| 请基于原始代码进行"最小必要修改"的修复,绝不要替换为通用示例。 |
| |
| 原始任务:{goal} |
| 原始代码(片段): |
| ```python |
| {(original_code or '')[:4000]} |
| ``` |
| |
| 请仅输出修复后的完整 main.py(不要包含```标记)。 |
| """ |
|
|
|
|
| |
| def build_planner_decision_prompt( |
| task: str, |
| plan: list[str], |
| have_code: bool, |
| last_err: str, |
| last_stdout: str, |
| last_stderr: str, |
| required_kws: list[str], |
| expected_out: str, |
| ) -> str: |
| plan_text = "\n".join(f"- {p}" for p in (plan or [])) or "(无计划)" |
| last_res = "" |
| if last_err or last_stdout or last_stderr: |
| last_res = f"【上一步结果】\nerr:{(last_err or '')[:600]}\nstdout:{(last_stdout or '')[:600]}\nstderr:{(last_stderr or '')[:600]}" |
| req = ", ".join(required_kws or []) |
| exp = expected_out or "(未指定)" |
| return f""" |
| 你是"规划师(Manager)"。任务:{task} |
| 当前计划: |
| {plan_text} |
| |
| 是否已有候选代码:{"是" if have_code else "否"} |
| {last_res} |
| |
| 约束: |
| - 必须尽量少改代码,先修语法/运行问题,再谈重构 |
| - 必须满足关键词: [{req}](若有) |
| - 期望 stdout 片段: {exp} |
| |
| 请只输出 JSON,字段: |
| {{ |
| "action": "code | run | reflect | stop", |
| "reason": "为什么这么做", |
| "hints": "给编码专家的具体指示(仅当 action=code 时)" |
| }} |
| 不要任何解释。 |
| """ |
|
|
|
|
| def build_planner_reflect_prompt(task: str, old_plan: list[str], last_err: str) -> str: |
| old = "\n".join(f"- {p}" for p in (old_plan or [])) or "(无)" |
| return f""" |
| 我们原计划: |
| {old} |
| |
| 但遇到错误/阻碍: |
| {(last_err or '')[:800]} |
| |
| 请你给出新的分步计划(越简越好,每行一步),只输出纯文本每行一个要点。 |
| """ |
|
|
|
|
| def parse_planner_action(text: str) -> dict: |
| try: |
| import json as _json, re as _re |
| m = _re.search(r"\{.*\}", text or "", _re.S) |
| if m: |
| d = _json.loads(m.group(0)) |
| a = (d.get("action", "") or "").lower().strip() |
| if a in {"code", "run", "reflect", "stop"}: |
| return {"action": a, "reason": d.get("reason", ""), "hints": d.get("hints", "")} |
| except Exception: |
| pass |
| low = (text or "").lower() |
| if "reflect" in low: |
| return {"action": "reflect", "reason": text, "hints": ""} |
| if "run" in low: |
| return {"action": "run", "reason": text, "hints": ""} |
| if "stop" in low: |
| return {"action": "stop", "reason": text, "hints": ""} |
| return {"action": "code", "reason": text, "hints": text} |
|
|
|
|
| |
| DEFAULT_TEAM = [ |
| {"name": "Manager", "persona": "统筹目标,做出行动决策;严控最小修改;优先修复运行/语法问题", "provider": "", "model": ""}, |
| {"name": "Architect", "persona": "审视结构与依赖,提出稳妥的小步修改建议;避免大改动", "provider": "", "model": ""}, |
| {"name": "QA", "persona": "基于报错和输出提出验证要求,指出潜在风险与遗漏", "provider": "", "model": ""}, |
| {"name": "Ops", "persona": "关注依赖、环境、参数与超时等运行因素,提出运行建议", "provider": "", "model": ""}, |
| ] |
|
|
|
|
| def load_team_from_conf() -> Tuple[bool, List[Dict[str, str]], str, int]: |
| d = load_all() |
| enabled = bool(d.get("planner_team_enabled", False)) |
| members = d.get("planner_team_members") or [] |
| if not isinstance(members, list) or not members: |
| members = DEFAULT_TEAM.copy() |
| max_conc = int(d.get("planner_team_max_concurrency", 4)) |
| priority = d.get("planner_team_priority", "run>code>reflect>stop") |
| return enabled, members, priority, max_conc |
|
|
|
|
| def save_team_to_conf(enabled: bool, members: List[Dict[str, str]], priority: str, max_conc: int) -> str: |
| d = load_all() |
| d["planner_team_enabled"] = bool(enabled) |
| clean = [] |
| for m in members or []: |
| if not isinstance(m, list) and not isinstance(m, dict): |
| continue |
| if isinstance(m, list): |
| if len(m) < 4: |
| continue |
| if not any(str(x).strip() for x in m): |
| continue |
| clean.append({ |
| "name": str(m[0]).strip(), |
| "provider": str(m[1]).strip(), |
| "model": str(m[2]).strip(), |
| "persona": str(m[3]).strip(), |
| }) |
| else: |
| if not any(str(v).strip() for v in m.values()): |
| continue |
| clean.append({ |
| "name": str(m.get("name", "")).strip(), |
| "provider": str(m.get("provider", "")).strip(), |
| "model": str(m.get("model", "")).strip(), |
| "persona": str(m.get("persona", "")).strip(), |
| }) |
| if not clean: |
| clean = DEFAULT_TEAM.copy() |
| d["planner_team_members"] = clean |
| d["planner_team_priority"] = priority or "run>code>reflect>stop" |
| d["planner_team_max_concurrency"] = int(max(1, min(16, max_conc or 4))) |
| return save_json(d, auto=True) |
|
|
|
|
| def team_vote_priority_list(pr: str) -> List[str]: |
| parts = [p.strip().lower() for p in (pr or "").split(">") if p.strip()] |
| known = ["run", "code", "reflect", "stop"] |
| out = [p for p in parts if p in known] |
| for k in known: |
| if k not in out: |
| out.append(k) |
| return out |
|
|
|
|
| def aggregate_team_decisions(decisions: List[Dict[str, str]], priority_order: List[str]) -> Dict[str, str]: |
| counts = {} |
| for d in decisions: |
| a = (d.get("action", "") or "").lower().strip() |
| if not a: |
| continue |
| counts[a] = counts.get(a, 0) + 1 |
| best_action = None |
| best_votes = -1 |
| for a, v in counts.items(): |
| if v > best_votes: |
| best_action = a |
| best_votes = v |
| elif v == best_votes: |
| if priority_order.index(a) < priority_order.index(best_action): |
| best_action = a |
| if not best_action: |
| best_action = priority_order[0] |
| reasons = "\n".join([f"[{d.get('by', '?')}] {d.get('reason', '')}" for d in decisions if d.get("reason")]) |
| hints = "\n".join([f"[{d.get('by', '?')}] {d.get('hints', '')}" for d in decisions if d.get("hints")]) |
| return {"action": best_action, "reason": reasons[:1200], "hints": hints[:2000]} |
| |
| |
| class RateLimiter: |
| def __init__(self): |
| self.buckets: Dict[str, deque] = {} |
|
|
| def wait(self, provider: str, model: str): |
| rpm = get_saved_rpm(provider, model) |
| if rpm == 0 and provider == "gemini" and "1.5" in (model or "").lower() and "pro" in (model or "").lower(): |
| rpm = 5 |
| if rpm <= 0: |
| return |
| key = f"{provider}:{model}" |
| dq = self.buckets.setdefault(key, deque()) |
| now = time.time() |
| while dq and now - dq[0] > 60: |
| dq.popleft() |
| if len(dq) >= rpm: |
| sleep_sec = 60 - (now - dq[0]) |
| if sleep_sec > 0: |
| debug("RL", "速率限制等待", {"key": key, "sleep": round(sleep_sec, 2)}) |
| time.sleep(sleep_sec) |
| now = time.time() |
| while dq and now - dq[0] > 60: |
| dq.popleft() |
| dq.append(time.time()) |
|
|
|
|
| RATE_LIMITER = RateLimiter() |
|
|
|
|
| def _parse_retry_seconds(text: str) -> int: |
| try: |
| if not text: |
| return 10 |
| for pat in [ |
| r"retry_delay\s*\{\s*seconds:\s*(\d+)", |
| r"retry[- ]?after[^0-9]*(\d+)", |
| r"after\s+(\d+)\s*second", |
| ]: |
| m = re.search(pat, text, re.I) |
| if m: |
| return max(1, int(m.group(1))) |
| except Exception: |
| pass |
| return 10 |
|
|
|
|
| def llm_call( |
| provider: str, |
| model: str, |
| prompt: str, |
| keys: dict, |
| req_timeout: int = DEFAULT_REQ_TIMEOUT, |
| ) -> str: |
| t0 = time.time() |
| debug("LLM", "调用开始", {"provider": provider, "model": model, "prompt_len": len(prompt), "timeout": req_timeout}) |
| try: |
| if provider == "mock": |
| m = re.findall(r"```python\s*(.*?)```", prompt, re.S | re.I) |
| return m[0].strip() if m else "print('Hello from mock LLM')" |
|
|
| RATE_LIMITER.wait(provider, model) |
| custom = get_custom_providers() |
| is_custom = provider in custom |
|
|
| if provider == "gemini": |
| key = keys.get("gemini_key") or os.getenv("GOOGLE_API_KEY") |
| if not key: |
| return "❗ 请配置 GOOGLE_API_KEY" |
| try: |
| import google.generativeai as genai |
| try: |
| from google.api_core.exceptions import ResourceExhausted as GE_ResourceExhausted |
| except Exception: |
| class GE_ResourceExhausted(Exception): ... |
|
|
| genai.configure(api_key=key) |
| mdl = genai.GenerativeModel(model) |
| try: |
| resp = mdl.generate_content(prompt) |
| except GE_ResourceExhausted as e: |
| wait_s = _parse_retry_seconds(str(e)) |
| debug("RL", "Gemini 429 等待重试", {"wait": wait_s}) |
| time.sleep(wait_s + 2) |
| resp = mdl.generate_content(prompt) |
| text = "" |
| try: |
| text = resp.text |
| except Exception: |
| cands = getattr(resp, "candidates", []) or [] |
| if cands and getattr(cands[0], "content", None): |
| parts = getattr(cands[0].content, "parts", []) or [] |
| text = "".join([getattr(p, "text", "") for p in parts if hasattr(p, "text")]) |
| debug("LLM_DONE", "Gemini响应", {"elapsed": round(time.time() - t0, 3), "len": len(text or "")}) |
| return text or "❗ Gemini异常:无有效文本返回" |
| except ImportError: |
| return "❗ 未安装 google-generativeai" |
| except Exception as e: |
| debug("LLM_ERR", "Gemini调用异常", {"err": str(e), "trace": traceback.format_exc()}) |
| return f"❗ Gemini异常:{e}" |
|
|
| if provider == "anthropic": |
| key = keys.get("anthropic_key") |
| if not key: |
| return "❗ 请配置 ANTHROPIC_API_KEY" |
| url = "https://api.anthropic.com/v1/messages" |
| headers = { |
| "x-api-key": key, |
| "anthropic-version": "2023-06-01", |
| "content-type": "application/json", |
| } |
| payload = { |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 4096, |
| "temperature": 0.2, |
| } |
| r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) |
| debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) |
| if r.status_code != 200: |
| return f"❗ Anthropic错误 {r.status_code}:{r.text[:400]}" |
| data = r.json() |
| blocks = data.get("content", []) |
| if isinstance(blocks, list) and blocks: |
| return blocks[0].get("text", "") |
| return json.dumps(data, ensure_ascii=False) |
|
|
| if provider == "azure": |
| key = keys.get("azure_key") |
| base = keys.get("azure_base") or DEFAULT_BASES["azure"] |
| deployment = keys.get("azure_deployment") |
| version = keys.get("azure_version", "2024-02-15-preview") |
| if not all([key, base, deployment]): |
| return "❗ 请配置 Azure OpenAI 的 Key/Base/Deployment" |
| url = f"{base.rstrip('/')}/openai/deployments/{deployment}/chat/completions?api-version={version}" |
| headers = {"api-key": key, "Content-Type": "application/json"} |
| payload = { |
| "messages": [{"role": "user", "content": prompt}], |
| "temperature": 0.2, |
| "max_tokens": 4096, |
| } |
| r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) |
| debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) |
| if r.status_code != 200: |
| return f"❗ Azure错误 {r.status_code}:{r.text[:400]}" |
| data = r.json() |
| return data.get("choices", [{}])[0].get("message", {}).get("content", json.dumps(data, ensure_ascii=False)) |
|
|
| if provider == "hf": |
| token = keys.get("hf_token") |
| if not token: |
| return "❗ 请配置 HF_TOKEN" |
| url = HF_API_URL + model |
| headers = {"Authorization": f"Bearer {token}"} |
| payload = {"inputs": prompt, "parameters": {"max_new_tokens": 2048, "temperature": 0.2}} |
| r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) |
| debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) |
| if r.status_code != 200: |
| return f"❗ HuggingFace错误 {r.status_code}:{r.text[:400]}" |
| data = r.json() |
| if isinstance(data, list) and data: |
| return data[0].get("generated_text", str(data)) |
| return json.dumps(data, ensure_ascii=False) |
|
|
| if provider in OPENAI_LIKE or is_custom: |
| if is_custom: |
| cinfo = custom.get(provider, {}) |
| base = cinfo.get("base", "") |
| key = cinfo.get("key", "") |
| extra = {} |
| if cinfo.get("referer"): |
| extra["HTTP-Referer"] = cinfo["referer"] |
| if cinfo.get("title"): |
| extra["X-Title"] = cinfo["title"] |
| else: |
| base = keys.get(f"{provider}_base") or DEFAULT_BASES.get(provider, "") |
| key = keys.get(f"{provider}_key") |
| extra = {} |
| if provider == "openrouter": |
| if keys.get("openrouter_referer"): |
| extra["HTTP-Referer"] = keys["openrouter_referer"] |
| if keys.get("openrouter_title"): |
| extra["X-Title"] = keys["openrouter_title"] |
| if not base or not key: |
| return f"❗ 请配置 {provider.upper()} Base/Key" |
| url = base.rstrip("/") + "/chat/completions" |
| headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra} |
| payload = { |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "temperature": 0.2, |
| "max_tokens": 4096, |
| } |
| r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) |
| if r.status_code == 429: |
| ra = r.headers.get("Retry-After", "") |
| try: |
| wait_s = max(1, int(float(ra))) if ra else _parse_retry_seconds(r.text) |
| except Exception: |
| wait_s = _parse_retry_seconds(r.text) |
| debug("RL", "429 等待后重试", {"wait": wait_s, "url": url}) |
| time.sleep((wait_s or 10) + 2) |
| r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) |
| debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3), "len": len(r.text or "")}) |
| if r.status_code == 401: |
| return "❗ 401 未授权" |
| if r.status_code != 200: |
| return f"❗ API错误 {r.status_code}:{r.text[:400]}" |
| data = r.json() |
| return data.get("choices", [{}])[0].get("message", {}).get("content", json.dumps(data, ensure_ascii=False)) |
|
|
| return "❗ 未支持的提供商" |
| except Exception as e: |
| debug("LLM_ERR", "请求异常", {"err": str(e), "trace": traceback.format_exc()}) |
| return f"❗ 请求异常:{str(e)}" |
|
|
|
|
| |
| def get_models_list(provider: str, keys: dict): |
| err = "" |
| rec = RECOMMENDED_MODELS.get(provider, []) |
| fetched = [] |
| try: |
| custom = get_custom_providers() |
| if provider in custom: |
| base = custom[provider].get("base", "") |
| key = custom[provider].get("key", "") |
| if base and key: |
| r = requests.get(f"{base.rstrip('/')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=20) |
| if r.status_code == 200: |
| data = r.json() |
| fetched = [m.get("id") for m in (data.get("data") or data) if isinstance(m, dict) and m.get("id")] |
| else: |
| err = f"自定义API {r.status_code}: {r.text[:120]}" |
| else: |
| err = "请先在自定义API中配置 Base URL 和 Key" |
| elif provider in OPENAI_LIKE: |
| base = keys.get(f"{provider}_base") or DEFAULT_BASES.get(provider, "") |
| key = keys.get(f"{provider}_key") |
| if base and key: |
| r = requests.get(f"{base.rstrip('/')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=20) |
| if r.status_code == 200: |
| data = r.json() |
| fetched = [m.get("id") for m in (data.get("data") or data) if isinstance(m, dict) and m.get("id")] |
| else: |
| err = f"API {r.status_code}: {r.text[:120]}" |
| else: |
| err = "请先配置 API Key 和 Base URL" |
| elif provider == "gemini": |
| key = keys.get("gemini_key") or os.getenv("GOOGLE_API_KEY") |
| if key: |
| try: |
| import google.generativeai as genai |
| genai.configure(api_key=key) |
| fetched = [getattr(m, "name", "").replace("models/", "") for m in genai.list_models() if getattr(m, "name", "")] |
| except ImportError: |
| err = "未安装 google-generativeai" |
| else: |
| err = "请先配置 GOOGLE_API_KEY" |
| except Exception as e: |
| err = f"异常: {e}" |
| api_success = (err == "") and bool(fetched) |
| models_to_use = sorted(set(fetched)) if api_success else (get_models_cache(provider) or rec) |
| if api_success: |
| set_models_cache(provider, models_to_use) |
| debug("MODELS", "获取模型列表", {"provider": provider, "count": len(models_to_use), "err": err}) |
| return models_to_use, rec, err |
|
|
|
|
| def refresh_models(provider: str, current_model: str, keys: dict): |
| models, rec, err = get_models_list(provider, keys) |
| info = (f"⚠️ {err}\n" if err else "") + f"ℹ️ 共 {len(models)} 个模型(列表来自 {'缓存/推荐' if err else '在线/缓存'})" |
| fallback = next((m for m in rec if m in models), (models[0] if models else "")) |
| value = current_model if current_model else fallback |
| return (gr.update(choices=models, value=value), info, [["-", "-", "-"]], "\n".join(models or [])) |
|
|
|
|
| def quick_test_one(provider: str, model: str, keys: dict) -> bool: |
| ck = f"{provider}:{model}" |
| if ck in MODEL_OK_CACHE: |
| return MODEL_OK_CACHE[ck] |
| try: |
| res = llm_call(provider, model, "请仅回复:pong", keys, req_timeout=12) |
| ok = bool(res) and (not str(res).startswith("❗")) |
| debug("DETECT", "模型检测结果", {"key": ck, "ok": ok}) |
| except Exception as e: |
| ok = False |
| debug("DETECT_ERR", "检测异常", {"key": ck, "err": str(e), "trace": traceback.format_exc()}) |
| MODEL_OK_CACHE[ck] = ok |
| return ok |
|
|
|
|
| def test_models(provider: str, keys: dict): |
| t0 = time.time() |
| models = get_models_cache(provider) |
| if not models: |
| models, _, _ = get_models_list(provider, load_all()) |
| rec = RECOMMENDED_MODELS.get(provider, []) |
| if not models: |
| return [["-", "-", "-"]], "❌ 未获取到模型列表", [] |
| rows = [] |
| max_workers = min(16, len(models)) |
| default_workers = 8 |
| rpm_hint = get_saved_rpm(provider, models[0] if models else "") |
| max_workers = max(2, min(max_workers, rpm_hint)) if rpm_hint and rpm_hint > 0 else min(max_workers, default_workers) |
| debug("DETECT", "开始批量检测", {"provider": provider, "count": len(models), "workers": max_workers}) |
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex: |
| fut = {ex.submit(quick_test_one, provider, m, load_all()): m for m in models} |
| for f in concurrent.futures.as_completed(fut): |
| m = fut[f] |
| try: |
| ok = f.result() |
| except Exception: |
| ok = False |
| rows.append([m, "⭐" if m in rec else "", "✅" if ok else "❌"]) |
| status_map = {r[0]: r[2] for r in rows} |
| ordered = [[m, "⭐" if m in rec else "", status_map.get(m, "❓")] for m in models] |
| ok_models = [m for m in models if status_map.get(m) == "✅"] |
| dt = time.time() - t0 |
| debug("DETECT", "批量检测完成", {"provider": provider, "ok": len(ok_models), "elapsed": round(dt, 2)}) |
| info = f"✅ 已检测 {len(models)} 个模型。通过: {len(ok_models)} 并发: {max_workers} ⏱ {dt:.2f}s" |
| return ordered, info, ok_models |
|
|
|
|
| |
| def static_validate(code: str) -> Tuple[bool, str]: |
| if not (code or "").strip(): |
| return False, "代码为空" |
| ok, msg = compile_syntax_ok(code) |
| if not ok: |
| return False, msg |
| return True, "" |
|
|
|
|
| def dynamic_validate(rc: int, out: str, err: str) -> Tuple[bool, str]: |
| if rc == 0: |
| return True, "" |
| msg = f"进程退出码 {rc}" |
| if "Traceback" in (err or "") or "Error" in (err or "") or "Exception" in (err or ""): |
| msg += "(检测到异常)" |
| return False, msg |
|
|
|
|
| def semantic_validate( |
| original_fp: CodeFingerprint, |
| code: str, |
| required_keywords: List[str], |
| expected_stdout_contains: str, |
| ) -> Tuple[bool, str]: |
| try: |
| new_fp = extract_fingerprint(code or "") |
| ok_rel, rel_msg = relevance_check(original_fp, new_fp) |
| if not ok_rel: |
| return False, f"候选代码可能偏离原始领域/结构:{rel_msg}" |
| low = (code or "").lower() |
| for kw in required_keywords or []: |
| if kw and kw.lower() not in low: |
| return False, f"缺少必须关键词/结构:{kw}" |
| return True, "通过" |
| except Exception as e: |
| return False, f"语义校验异常:{e}" |
|
|
|
|
| def run_in_sandbox(code: str, attach_paths: List[str], cli_args: str, timeout: int) -> Tuple[int, str, str, str, str]: |
| global CURRENT_RUN_DIR |
| pip_target = "" |
| try: |
| |
| temp_dir = Path(tempfile.gettempdir()) / "pyzdh_runs" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
| rid = datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + hashlib.sha1(code[:2000].encode("utf-8")).hexdigest()[:8] |
| workdir = str(temp_dir / rid) |
| Path(workdir).mkdir(parents=True, exist_ok=True) |
| CURRENT_RUN_DIR = workdir |
|
|
| |
| main_py = Path(workdir) / "main.py" |
| main_py.write_text(code, "utf-8") |
|
|
| |
| in_dir = Path(workdir) / "inputs" |
| in_dir.mkdir(exist_ok=True) |
| for pth in attach_paths or []: |
| try: |
| if os.path.isfile(pth): |
| shutil.copy2(pth, in_dir / Path(pth).name) |
| except Exception as e: |
| try: |
| debug("WARN", f"复制附件失败: {pth}", {"error": str(e)}) |
| except Exception: |
| pass |
|
|
| |
| pip_log = "" |
| pip_log_path = Path(workdir) / "pip.log" |
| reqs = parse_requirements(code) |
| if reqs: |
| pip_target = str(Path(workdir) / ".pip_packages") |
| cmd = [sys.executable, "-m", "pip", "install", "--target", pip_target, "-U", *reqs] |
| try: |
| env = os.environ.copy() |
| env["PIP_TARGET"] = pip_target |
| p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) |
| _register_proc(p) |
| out, err = p.communicate(timeout=min(900, max(60, int(timeout or 40) * 5))) |
| pip_log = (out or "") + "\n" + (err or "") |
| except Exception as e: |
| pip_log = f"pip 安装异常: {e}\n" |
| try: |
| pip_log_path.write_text(pip_log, "utf-8") |
| except Exception: |
| pass |
|
|
| |
| env = os.environ.copy() |
| if pip_target and Path(pip_target).exists(): |
| env["PYTHONPATH"] = pip_target |
| argv = [sys.executable, str(main_py)] |
| if (cli_args or "").strip(): |
| try: |
| argv += shlex.split(cli_args) |
| except Exception: |
| argv += [cli_args] |
| try: |
| debug("RUN", "开始执行", {"argv": argv, "cwd": workdir}) |
| except Exception: |
| pass |
| p = subprocess.Popen(argv, cwd=workdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) |
| _register_proc(p) |
|
|
| deadline = time.time() + int(timeout or 40) |
| out_lines = [] |
| err_lines = [] |
| while True: |
| if STOP_FLAG: |
| try: |
| p.terminate() |
| time.sleep(0.1) |
| if p.poll() is None: |
| p.kill() |
| except Exception: |
| pass |
| break |
| if p.stdout: |
| line = p.stdout.readline() |
| if line: |
| out_lines.append(line) |
| if p.stderr: |
| el = p.stderr.readline() |
| if el: |
| err_lines.append(el) |
| if p.poll() is not None: |
| try: |
| ro, re_ = p.communicate(timeout=0.2) |
| if ro: |
| out_lines.append(ro) |
| if re_: |
| err_lines.append(re_) |
| except Exception: |
| pass |
| break |
| if time.time() > deadline: |
| try: |
| p.terminate() |
| time.sleep(0.1) |
| if p.poll() is None: |
| p.kill() |
| except Exception: |
| pass |
| err_lines.append(f"\n[执行超时, 超时={timeout}s]") |
| break |
| time.sleep(0.01) |
|
|
| out = "".join(out_lines) |
| err = "".join(err_lines) |
| rc = p.poll() if p else 0 |
| try: |
| (Path(workdir) / "stdout.txt").write_text(out, "utf-8") |
| (Path(workdir) / "stderr.txt").write_text(err, "utf-8") |
| except Exception: |
| pass |
| return rc, out, err, pip_log, workdir |
|
|
| except Exception as e: |
| error_msg = f"run_in_sandbox 异常: {e}" |
| try: |
| debug("RUN_ERR", "沙箱执行失败", {"error": error_msg, "workdir": CURRENT_RUN_DIR}) |
| except Exception: |
| pass |
| return 998, "", error_msg, "", CURRENT_RUN_DIR or "" |
|
|
|
|
| def package_run_dir(run_dir: str) -> str: |
| try: |
| if not run_dir or not os.path.isdir(run_dir): |
| return "" |
| zname = f"package-{Path(run_dir).name}.zip" |
| zpath = str(Path(LOGS_DIR) / zname) |
| with zipfile.ZipFile(zpath, "w", zipfile.ZIP_DEFLATED) as zf: |
| for root, dirs, files in os.walk(run_dir): |
| for fn in files: |
| fp = os.path.join(root, fn) |
| arc = os.path.relpath(fp, run_dir) |
| zf.write(fp, arcname=arc) |
| return zpath |
| except Exception as e: |
| debug("ZIP", "打包运行目录失败", {"err": str(e)}) |
| return "" |
|
|
|
|
| def package_all_logs(current_run_dir: str = "") -> str: |
| try: |
| ts = datetime.now().strftime("%Y%m%d-%H%M%S") |
| zpath = str(Path(LOGS_DIR) / f"all-logs-{ts}.zip") |
| with zipfile.ZipFile(zpath, "w", zipfile.ZIP_DEFLATED) as zf: |
| for p in Path(LOGS_DIR).glob("*.log"): |
| zf.write(str(p), arcname=p.name) |
| if current_run_dir and os.path.isdir(current_run_dir): |
| for root, dirs, files in os.walk(current_run_dir): |
| for fn in files: |
| fp = os.path.join(root, fn) |
| arc = os.path.join("run_dir", os.path.relpath(fp, current_run_dir)) |
| zf.write(fp, arcname=arc) |
| return zpath |
| except Exception as e: |
| debug("ZIP", "打包日志失败", {"err": str(e)}) |
| return "" |
|
|
|
|
| |
| def orchestrate(**kwargs) -> dict: |
| global STOP_FLAG |
| STOP_FLAG = False |
| task = kwargs.get("task", "") |
| global_hint = kwargs.get("global_hint", "") |
| provider = kwargs.get("provider", "gemini") |
| model = kwargs.get("model", "gemini-1.5-flash") |
| keys = kwargs.get("keys", {}) |
| files = kwargs.get("files", []) |
| max_attempts = kwargs.get("max_attempts", 3) |
| timeout = kwargs.get("timeout", 40) |
| required_kws_text = kwargs.get("required_kws_text", "") |
| expected_out_contains = kwargs.get("expected_out_contains", "") |
| cli_args = kwargs.get("cli_args", "") |
| baseline_code = kwargs.get("baseline_code", "") |
|
|
| |
| team_enabled = kwargs.get("team_enabled", False) |
| team_members = kwargs.get("team_members", []) |
| team_priority = kwargs.get("team_priority", "run>code>reflect>stop") |
| team_max_conc = kwargs.get("team_max_conc", 4) |
|
|
| |
| current_code = "" |
| original_fp = CodeFingerprint() |
| last_err, last_stdout, last_stderr = "", "", "" |
| workdir = "" |
| status = "开始" |
|
|
| |
| if (baseline_code or "").strip(): |
| current_code = baseline_code |
| elif files: |
| py_files = [f for f in files if str(f).endswith((".py", ".pyw"))] |
| target = py_files[0] if py_files else files[0] |
| try: |
| current_code = Path(target).read_text("utf-8") |
| except Exception as e: |
| return {"status": f"❌ 读取初始文件失败: {e}", "logs": get_debug_text()} |
|
|
| if not (current_code or "").strip() and not (task or "").strip(): |
| return {"status": "❌ 任务和初始代码均为空", "logs": get_debug_text()} |
|
|
| original_fp = extract_fingerprint(current_code) |
| plan = build_initial_plan(task, original_fp) |
| req_kws = [k.strip() for k in (required_kws_text or "").split(",") if k.strip()] |
|
|
| |
| for attempt in range(1, max_attempts + 1): |
| if STOP_FLAG: |
| status = "⏹️ 用户手动停止" |
| break |
|
|
| debug("ORCH", f"第 {attempt}/{max_attempts} 次尝试", {"plan": plan}) |
|
|
| |
| planner_pvd = provider |
| planner_mdl = model |
|
|
| if team_enabled: |
| debug("PLAN", "团队决策模式", {"members": len(team_members), "conc": team_max_conc}) |
| decisions = [] |
|
|
| def get_team_decision(member: dict): |
| p = member.get("provider") or provider |
| m = member.get("model") or model |
| persona_prompt = f"你是 {member.get('name', '成员')},你的职责是:{member.get('persona', '规划')}\n" |
| prompt = persona_prompt + build_planner_decision_prompt( |
| task, plan, bool(current_code), last_err, last_stdout, last_stderr, req_kws, expected_out_contains |
| ) |
| resp = llm_call(p, m, prompt, keys) |
| action_parsed = parse_planner_action(resp) |
| action_parsed['by'] = member.get('name', '?') |
| return action_parsed |
|
|
| with concurrent.futures.ThreadPoolExecutor(max_workers=team_max_conc) as executor: |
| futures = [executor.submit(get_team_decision, member) for member in team_members] |
| for future in concurrent.futures.as_completed(futures): |
| try: |
| decisions.append(future.result()) |
| except Exception as e: |
| debug("TEAM_ERR", "获取团队成员决策异常", {"err": str(e)}) |
|
|
| action = aggregate_team_decisions(decisions, team_vote_priority_list(team_priority)) |
|
|
| else: |
| prompt = build_planner_decision_prompt( |
| task, plan, bool(current_code), last_err, last_stdout, last_stderr, req_kws, expected_out_contains |
| ) |
| resp = llm_call(planner_pvd, planner_mdl, prompt, keys) |
| action = parse_planner_action(resp) |
|
|
| debug("PLAN", "规划结果", {"action": action.get("action"), "reason": action.get("reason")}) |
|
|
| |
| act = action.get("action") |
| if act == "stop": |
| status = f"ℹ️ 规划师决定停止:{action.get('reason', '')}" |
| break |
|
|
| elif act == "reflect": |
| prompt = build_planner_reflect_prompt(task, plan, last_err) |
| new_plan_text = llm_call(planner_pvd, planner_mdl, prompt, keys) |
| plan = [p.strip() for p in new_plan_text.splitlines() if p.strip()] |
| continue |
|
|
| elif act == "run": |
| if not current_code: |
| last_err = "代码为空,无法运行" |
| continue |
| rc, out, err, pip_log, w_dir = run_in_sandbox(current_code, files, cli_args, timeout) |
| workdir = w_dir |
| last_stdout, last_stderr = out, err |
| ok, msg = dynamic_validate(rc, out, err) |
| if ok and (not expected_out_contains or expected_out_contains in out): |
| status = f"✅ 成功 (第 {attempt} 次)" |
| break |
| else: |
| last_err = f"{msg}\nSTDOUT:\n{out[-1000:]}\nSTDERR:\n{err[-1000:]}\nPIP.LOG:\n{pip_log[-1000:]}" |
|
|
| elif act == "code": |
| coder_hint = action.get("hints", "") |
| prompt = build_fix_prompt(task, original_fp, last_err, current_code, attempt, req_kws, global_hint + "\n" + coder_hint) |
| new_code_raw = llm_call(provider, model, prompt, keys) |
|
|
| if str(new_code_raw).startswith("❗"): |
| last_err = f"生成代码失败: {new_code_raw}" |
| continue |
|
|
| new_code = strip_code_fences(new_code_raw) |
| ok, msg = static_validate(new_code) |
| if not ok: |
| last_err = f"生成了无效代码: {msg}" |
| continue |
|
|
| ok, msg = semantic_validate(original_fp, new_code, req_kws, expected_out_contains) |
| if not ok: |
| last_err = f"代码偏离目标: {msg}" |
| prompt_reanchor = build_reanchor_prompt(task, current_code, msg, global_hint) |
| new_code_raw = llm_call(provider, model, prompt_reanchor, keys) |
| new_code = strip_code_fences(new_code_raw) |
|
|
| current_code = new_code |
| last_err = "" |
|
|
| else: |
| status = f"❌ 达到最大尝试次数 ({max_attempts})" |
|
|
| |
| zip_path = package_run_dir(workdir) if workdir else "" |
| main_path = str(Path(workdir) / "main.py") if workdir else "" |
|
|
| return { |
| "code": current_code, |
| "status": status, |
| "attempts": attempt, |
| "stdout": last_stdout, |
| "stderr": last_stderr, |
| "download_main": main_path, |
| "zip_path": zip_path, |
| "workdir": workdir, |
| "logs": get_debug_text(), |
| } |
| |
| |
| class _MCPManager: |
| def __init__(self, root: str): |
| self.root = Path(root) |
| self.root.mkdir(parents=True, exist_ok=True) |
|
|
| def install(self, tool_id: str) -> str: |
| try: |
| td = self.root / str(tool_id) |
| td.mkdir(parents=True, exist_ok=True) |
| (td / "installed.txt").write_text(datetime.now().isoformat(), "utf-8") |
| return f"✅ 已安装: {tool_id}" |
| except Exception as e: |
| return f"❌ 安装失败: {e}" |
|
|
| def uninstall(self, tool_id: str) -> str: |
| try: |
| td = self.root / str(tool_id) |
| if td.exists(): |
| shutil.rmtree(td) |
| return f"✅ 已卸载: {tool_id}" |
| except Exception as e: |
| return f"❌ 卸载失败: {e}" |
|
|
| def save_config(self, tool_id: str, config_json_str: str) -> str: |
| try: |
| td = self.root / str(tool_id) |
| td.mkdir(parents=True, exist_ok=True) |
| (td / "config.json").write_text(config_json_str or "{}", "utf-8") |
| return f"✅ 已保存配置: {tool_id}" |
| except Exception as e: |
| return f"❌ 配置保存失败: {e}" |
|
|
|
|
| mcp = _MCPManager(TOOLS_ROOT) |
|
|
|
|
| |
| def _project_path(name: str) -> Path: |
| return Path(PROJECT_ROOT) / name |
|
|
|
|
| def list_projects() -> List[str]: |
| try: |
| items = [] |
| for p in sorted(Path(PROJECT_ROOT).glob("*")): |
| if p.is_dir() and (p / ".git").exists(): |
| items.append(p.name) |
| return items |
| except Exception: |
| return [] |
|
|
|
|
| def list_branches(project_name: str) -> List[str]: |
| try: |
| if not (project_name or "").strip(): |
| return [] |
| repo = Repo(str(_project_path(project_name))) |
| return [h.name for h in repo.heads] |
| except Exception: |
| return [] |
|
|
|
|
| def list_files_in_project(project_name: str) -> List[List[str]]: |
| rows = [] |
| try: |
| if not (project_name or "").strip(): |
| return [["-", "-", "-"]] |
| root = _project_path(project_name) |
| for p in sorted(root.rglob("*")): |
| if p.is_file() and ".git" not in str(p): |
| try: |
| size = p.stat().st_size |
| mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime)) |
| rel = str(p.relative_to(root)) |
| rows.append([rel, f"{size//1024}KB", mtime]) |
| except Exception: |
| rows.append([str(p), "?", "?"]) |
| return rows[:1000] or [["-", "-", "-"]] |
| except Exception: |
| return [["-", "-", "-"]] |
|
|
|
|
| def read_file_content(project_name: str, file_rel_path: str) -> str: |
| try: |
| root = _project_path(project_name) |
| fp = root / file_rel_path |
| if not fp.exists(): |
| return "" |
| return fp.read_text("utf-8", errors="replace") |
| except Exception as e: |
| return f"读取失败: {e}" |
|
|
|
|
| def save_file_content(project_name: str, file_rel_path: str, content: str) -> str: |
| try: |
| root = _project_path(project_name) |
| fp = root / file_rel_path |
| fp.parent.mkdir(parents=True, exist_ok=True) |
| fp.write_text(content or "", "utf-8") |
| return "✅ 已保存文件" |
| except Exception as e: |
| return f"❌ 保存失败: {e}" |
|
|
|
|
| def pull_github_repo(url: str, token: str) -> str: |
| try: |
| if not (url or "").strip(): |
| return "❌ 请输入仓库 URL" |
| clone_url = url.strip() |
| if token and clone_url.startswith("https://"): |
| clone_url = re.sub(r"^https://", f"https://{token}@", clone_url) |
| name = Path(urllib.parse.urlparse(url).path).name.replace(".git", "") or ("repo-" + datetime.now().strftime("%H%M%S")) |
| dest = _project_path(name) |
| if dest.exists(): |
| repo = Repo(str(dest)) |
| repo.remote().pull() |
| return f"✅ 已更新项目:{name}" |
| else: |
| Repo.clone_from(clone_url, str(dest)) |
| return f"✅ 已拉取项目:{name}" |
| except GitCommandError as e: |
| return f"❌ Git 错误: {e}" |
| except Exception as e: |
| return f"❌ 拉取失败: {e}" |
|
|
|
|
| def create_and_switch_branch(project_name: str, new_branch: str) -> Tuple[str, List[str]]: |
| try: |
| if not new_branch: |
| return "❌ 请输入新分支名", list_branches(project_name) |
| repo = Repo(str(_project_path(project_name))) |
| if new_branch in [h.name for h in repo.heads]: |
| repo.git.checkout(new_branch) |
| else: |
| repo.git.checkout("-b", new_branch) |
| return f"✅ 已切换到分支:{new_branch}", list_branches(project_name) |
| except Exception as e: |
| return f"❌ 创建/切换分支失败: {e}", list_branches(project_name) |
|
|
|
|
| def commit_changes(project_name: str, msg: str) -> str: |
| try: |
| repo = Repo(str(_project_path(project_name))) |
| repo.git.add("--all") |
| if not msg: |
| msg = f"update at {datetime.now().isoformat()}" |
| if repo.is_dirty(index=True, working_tree=True, untracked_files=True): |
| c = repo.index.commit(msg) |
| return f"✅ 已提交:{c.hexsha[:8]}" |
| return "ℹ️ 无变更可提交" |
| except Exception as e: |
| return f"❌ 提交失败: {e}" |
|
|
|
|
| |
| def read_text_auto(p: Path) -> Tuple[str, str]: |
| for enc in ("utf-8", "utf-8-sig", "gb18030", "gbk", "big5", "latin-1"): |
| try: |
| return p.read_text(encoding=enc), enc |
| except Exception: |
| continue |
| data = p.read_bytes() |
| return data.decode("latin-1", errors="ignore"), "latin-1" |
|
|
|
|
| def save_text_keep(p: Path, text: str, original_enc: str, backup=True): |
| if backup and p.exists(): |
| shutil.copy2(p, str(p) + ".bak") |
| p.write_text(text, encoding=original_enc) |
|
|
|
|
| def unified_diff_str(old: str, new: str, file: Path, limit_kb: int = 512): |
| a = old.splitlines(keepends=True) |
| b = new.splitlines(keepends=True) |
| gen = difflib.unified_diff(a, b, fromfile=str(file), tofile=str(file)) |
| cap = limit_kb * 1024 |
| buf, size = [], 0 |
| for line in gen: |
| buf.append(line) |
| size += len(line) |
| if size > cap: |
| buf.append("\n... [diff truncated]\n") |
| break |
| return "".join(buf) |
|
|
|
|
| def find_line_bounds(text: str, pos: int) -> Tuple[int, int, int]: |
| ln = text.count("\n", 0, pos) + 1 |
| ls = text.rfind("\n", 0, pos) |
| ls = 0 if ls == -1 else ls + 1 |
| le = text.find("\n", pos) |
| le = len(text) if le == -1 else le |
| return ln, ls, le |
|
|
|
|
| def preview_line(text: str, s: int, e: int, max_len=160) -> str: |
| _, ls, le = find_line_bounds(text, s) |
| line = text[ls:le] |
| rs, re = s - ls, min(len(line), e - ls) |
| marked = line[:rs] + "<<<" + line[rs:re] + ">>>" + line[re:] |
| if len(marked) > max_len: |
| keep = max_len // 2 - 3 |
| marked = marked[:keep] + "..." + marked[-keep:] |
| return marked |
|
|
|
|
| def get_indent(line: str) -> int: |
| return len(line) - len(line.lstrip(" \t")) |
|
|
|
|
| def expand_paragraph(text: str, hit: int) -> Tuple[int, int]: |
| s = hit |
| while s > 0: |
| prev_nl = text.rfind("\n", 0, s) |
| ps = 0 if prev_nl == -1 else prev_nl + 1 |
| if text[ps:s].strip() == "": |
| s = ps |
| break |
| s = ps |
| e = hit |
| n = len(text) |
| while e < n: |
| nl = text.find("\n", e) |
| nl = n if nl == -1 else nl + 1 |
| if text[e:nl].strip() == "": |
| e = nl |
| break |
| e = nl |
| return s, e |
|
|
|
|
| def expand_python_block(text: str, hit: int) -> Tuple[int, int]: |
| i = hit |
| header_start = None |
| deco_start = None |
| while i >= 0: |
| ln, ls, le = find_line_bounds(text, i) |
| line = text[ls:le] |
| if line.lstrip().startswith(("def ", "class ", "async def ")): |
| header_start = ls |
| j = ls |
| while True: |
| prev_nl = text.rfind("\n", 0, j - 1) |
| ps = 0 if prev_nl == -1 else prev_nl + 1 |
| l2 = text[ps:j] |
| if l2.lstrip().startswith("@"): |
| deco_start = ps |
| j = ps |
| continue |
| break |
| start = deco_start if deco_start is not None else header_start |
| base_indent = get_indent(line) |
| k = le |
| end = len(text) |
| while k < len(text): |
| nl = text.find("\n", k) |
| nl = len(text) if nl == -1 else nl + 1 |
| l3 = text[k:nl] |
| if k > header_start and l3.strip() != "" and not l3.lstrip().startswith("@"): |
| if get_indent(l3) < base_indent: |
| end = k |
| break |
| k = nl |
| return start, end |
| i = ls - 1 |
| return expand_paragraph(text, hit) |
|
|
|
|
| def expand_brace_block(text: str, hit: int) -> Tuple[int, int]: |
| i = hit |
| while i >= 0: |
| if text[i] == "{": |
| depth = 0 |
| k = i |
| while k < len(text): |
| ch = text[k] |
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| if depth == 0: |
| return i, k + 1 |
| k += 1 |
| break |
| i -= 1 |
| return expand_paragraph(text, hit) |
|
|
|
|
| def auto_expand(text: str, hit: int, ext: str) -> Tuple[int, int]: |
| if ext in (".py", ".pyw"): |
| return expand_python_block(text, hit) |
| if "{" in text and "}" in text: |
| return expand_brace_block(text, hit) |
| return expand_paragraph(text, hit) |
|
|
|
|
| def build_pattern_from_input(s: str) -> re.Pattern: |
| s = s.strip() |
| if len(s) >= 2 and s[0] == "/" and s.count("/") >= 2: |
| last = s.rfind("/") |
| pat = s[1:last] |
| flags_s = s[last + 1:].lower() |
| flags = 0 |
| if "i" in flags_s: |
| flags |= re.I |
| if "m" in flags_s: |
| flags |= re.M |
| if "s" in flags_s: |
| flags |= re.S |
| return re.compile(pat, flags) |
| return re.compile(re.escape(s), re.S) |
|
|
|
|
| |
| def build_file_link_html(path: str, label: str = "") -> str: |
| try: |
| if not path or not os.path.exists(path): |
| return "<i>(暂无可下载文件)</i>" |
| href = f"/file={urllib.parse.quote(path)}" |
| lab = label or Path(path).name |
| return f"<a href='{href}' download style='text-decoration:none;'>📥 {html.escape(lab)}</a>" |
| except Exception as e: |
| return f"<i>生成链接失败: {e}</i>" |
|
|
|
|
| def render_chat_html(hist: str) -> str: |
| parts = (hist or "").split("|||") |
| htmls = ["<div class='chat-container' style='scroll-behavior:smooth;'>"] |
| for i in range(1, len(parts), 2): |
| u = parts[i] |
| a = parts[i + 1] if i + 1 < len(parts) else "" |
| htmls.append(f"<div class='user-msg'><div class='msg-avatar'>🧑</div><div class='msg-content'>{u}</div></div>") |
| htmls.append(f"<div class='bot-msg'><div class='msg-avatar'>🤖</div><div class='msg-content'>{a}</div></div>") |
| htmls.append("</div>") |
| return "\n".join(htmls) |
|
|
|
|
| def parse_chat_history(hist: str) -> List[Dict[str, str]]: |
| parts = (hist or "").split("|||") |
| conv = [] |
| for i in range(1, len(parts), 2): |
| u = html.unescape(parts[i]) |
| a = html.unescape(parts[i + 1]) if i + 1 < len(parts) else "" |
| conv.append({"id": (i // 2) + 1, "user": u, "assistant": a}) |
| return conv |
|
|
|
|
| def chat_choices_from_hist(hist: str) -> List[str]: |
| conv = parse_chat_history(hist) |
| out = [] |
| for turn in conv: |
| u_snip = (turn["user"].replace("\n", " ")[:30] + ("…" if len(turn["user"]) > 30 else "")) or "(空)" |
| out.append(f"{turn['id']:02d} | {u_snip}") |
| return out |
|
|
|
|
| def preview_chat_by_choice(hist: str, choice: str) -> str: |
| try: |
| if not choice: |
| return "(未选择)" |
| idx = int(choice.split("|")[0].strip()) |
| conv = parse_chat_history(hist) |
| item = next((t for t in conv if t["id"] == idx), None) |
| if not item: |
| return "(未找到该轮对话)" |
| u = item["user"] or "(空)" |
| a = item["assistant"] or "(空)" |
| return f"【用户】\n{u}\n\n【AI】\n{a}" |
| except Exception as e: |
| return f"(预览失败:{e})" |
|
|
|
|
| def plain_chat_text(hist: str) -> str: |
| conv = parse_chat_history(hist) |
| lines = [] |
| for t in conv: |
| lines.append(f"【用户】\n{t['user']}\n\n【AI】\n{t['assistant']}") |
| lines.append("-" * 60) |
| return "\n".join(lines).strip() or "(空)" |
|
|
|
|
| |
| def build_theme_css(mode: str, img_path: str) -> str: |
| common = """ |
| /* 紧凑聊天工具条与容器间距 */ |
| .chat-container{margin-bottom:8px} |
| .chat-toolbar{margin-top:6px;gap:8px} |
| /* 顶部Logo按钮已美化,见静态CSS */ |
| """ |
| if mode == "美化": |
| return common + """ |
| body{ |
| background: radial-gradient(1200px 600px at 10% 10%, rgba(255,255,255,.15), transparent 60%), |
| radial-gradient(1200px 600px at 90% 30%, rgba(255,255,255,.12), transparent 60%), |
| linear-gradient(135deg, #0ea5e9 0%, #9333ea 100%); |
| background-attachment: fixed; |
| } |
| """ |
| if mode == "白色": |
| return common + """ |
| body{ background:#ffffff!important; } |
| """ |
| if mode == "自定义背景" and img_path and os.path.exists(img_path): |
| href = f"/file={urllib.parse.quote(img_path)}" |
| return common + f""" |
| body{{ |
| background: url('{href}') center/cover no-repeat fixed; |
| }} |
| """ |
| return common + """ |
| body{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%)} |
| """ |
|
|
|
|
| def set_ui_theme(mode: str, img_path: str) -> str: |
| data = load_all() |
| data["ui_background_mode"] = mode |
| data["ui_background_img"] = img_path or "" |
| return save_json(data, auto=True) |
|
|
|
|
| def preload_theme_settings(): |
| d = load_all() |
| mode = d.get("ui_background_mode", "默认") |
| img = d.get("ui_background_img", "") |
| css_txt = build_theme_css(mode, img) |
| return gr.update(value=mode), f"<style id='dynamic-css'>{css_txt}</style>" |
|
|
|
|
| |
| def chat_send_common( |
| hist, message, chat_extra_files, task_files, pvd, mdl, last_summary, |
| task_text, req_kws_txt, exp_txt, allow_run, allow_full_shell, *kv, |
| ): |
| if not (message or "").strip(): |
| safe = render_chat_html(hist or "") |
| return hist or "", safe, "", {} |
|
|
| extra_paths = [resolve_file_path(p) for p in (chat_extra_files or []) if resolve_file_path(p)] |
| task_paths = [resolve_file_path(p) for p in (task_files or []) if resolve_file_path(p)] |
| merged = [] |
| seen = set() |
| for p in task_paths + extra_paths: |
| if p and p not in seen: |
| merged.append(p) |
| seen.add(p) |
|
|
| sys_hint = "" |
| if (last_summary or "").strip(): |
| sys_hint += f"【最近任务摘要】\n{last_summary}\n" |
| if (task_text or "").strip(): |
| sys_hint += f"【当前任务】\n{task_text}\n" |
| if (req_kws_txt or "").strip(): |
| sys_hint += f"【必须包含】{req_kws_txt}\n" |
| if (exp_txt or "").strip(): |
| sys_hint += f"【期望输出片段】{exp_txt}\n" |
| if sys_hint: |
| sys_hint += "\n" |
|
|
| chat_ctx = build_attachments_preview(merged, per_file_chars=1200, max_files=5, max_total_chars=8000) |
| user_prompt = sys_hint + (message or "") + ("\n\n" + chat_ctx if chat_ctx else "") |
|
|
| |
| keys = {} |
| keys_map = [ |
| "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", |
| "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", |
| "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", |
| "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", |
| "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" |
| ] |
| for i, v in enumerate(kv): |
| if i < len(keys_map): |
| keys[keys_map[i]] = v |
|
|
| ans = llm_call(pvd, mdl, user_prompt, keys, req_timeout=60) |
|
|
| exec_report = "" |
| try: |
| if allow_run: |
| blocks = re.findall(r"```json\s*(\{.*?\})\s*```", str(ans), re.S) + re.findall(r"(\{.*?\})", str(ans), re.S) |
| cmds = [] |
| for b in blocks: |
| try: |
| j = json.loads(b) |
| if isinstance(j, dict) and isinstance(j.get("commands"), list): |
| cmds = [str(c).strip() for c in j["commands"] if str(c).strip()] |
| break |
| except Exception: |
| continue |
| if cmds: |
| logs = run_commands(cmds, bool(allow_full_shell)) |
| exec_report += "\n\n[已根据AI建议执行命令]\n" + (logs[-4000:] if len(logs) > 4000 else logs) |
|
|
| mcp_ops = {} |
| for b in blocks: |
| try: |
| j = json.loads(b) |
| if isinstance(j, dict) and isinstance(j.get("mcp"), dict): |
| mcp_ops = j["mcp"] |
| break |
| except Exception: |
| continue |
| if mcp_ops: |
| msgs = [] |
| if isinstance(mcp_ops.get("install"), list): |
| for tid in mcp_ops["install"]: |
| msgs.append(f"安装 {tid}: {mcp.install(str(tid))}") |
| if isinstance(mcp_ops.get("uninstall"), list): |
| for tid in mcp_ops["uninstall"]: |
| msgs.append(f"卸载 {tid}: {mcp.uninstall(str(tid))}") |
| if isinstance(mcp_ops.get("config"), dict): |
| for tid, cfg in mcp_ops["config"].items(): |
| try: |
| msgs.append(f"配置 {tid}: {mcp.save_config(str(tid), json.dumps(cfg, ensure_ascii=False))}") |
| except Exception as e: |
| msgs.append(f"配置 {tid}: 失败 {e}") |
| if msgs: |
| exec_report += "\n\n[已执行MCP操作]\n" + "\n".join(msgs[-50:]) |
| except Exception as e: |
| exec_report += f"\n\n[执行阶段异常] {e}" |
|
|
| display_ans = str(ans) + (exec_report if exec_report else "") |
| safe_msg = html.escape((message or "") + ("\n\n[已附加附件上下文]" if chat_ctx else "")) |
| safe_ans = html.escape(display_ans) |
| new_hist = (hist or "") + "|||" + safe_msg + "|||" + safe_ans |
| html_render = render_chat_html(new_hist) |
| payload = {"message": message or "", "extra_paths": extra_paths, "ts": time.time()} |
| return new_hist, html_render, (display_ans if not str(ans).startswith("❗") else ""), payload |
|
|
|
|
| def chat_retry_common( |
| payload, hist, task_files, pvd, mdl, last_summary, |
| task_text, req_kws_txt, exp_txt, allow_run, allow_full_shell, *kv, |
| ): |
| msg = (payload or {}).get("message", "") |
| extra_paths = (payload or {}).get("extra_paths", []) |
| if not (msg or "").strip(): |
| safe = render_chat_html(hist or "") |
| return hist or "", safe, "" |
|
|
| task_paths = [resolve_file_path(p) for p in (task_files or []) if resolve_file_path(p)] |
| merged = [] |
| seen = set() |
| for p in task_paths + extra_paths: |
| if p and p not in seen: |
| merged.append(p) |
| seen.add(p) |
|
|
| sys_hint = "" |
| if (last_summary or "").strip(): |
| sys_hint += f"【最近任务摘要】\n{last_summary}\n" |
| if (task_text or "").strip(): |
| sys_hint += f"【当前任务】\n{task_text}\n" |
| if (req_kws_txt or "").strip(): |
| sys_hint += f"【必须包含】{req_kws_txt}\n" |
| if (exp_txt or "").strip(): |
| sys_hint += f"【期望输出片段】{exp_txt}\n" |
| if sys_hint: |
| sys_hint += "\n" |
|
|
| chat_ctx = build_attachments_preview(merged, per_file_chars=1200, max_files=5, max_total_chars=8000) |
| user_prompt = sys_hint + msg + ("\n\n" + chat_ctx if chat_ctx else "") |
|
|
| |
| keys = {} |
| keys_map = [ |
| "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", |
| "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", |
| "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", |
| "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", |
| "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" |
| ] |
| for i, v in enumerate(kv): |
| if i < len(keys_map): |
| keys[keys_map[i]] = v |
|
|
| ans = llm_call(pvd, mdl, user_prompt, keys, req_timeout=60) |
|
|
| exec_report = "" |
| try: |
| if allow_run: |
| blocks = re.findall(r"```json\s*(\{.*?\})\s*```", str(ans), re.S) + re.findall(r"(\{.*?\})", str(ans), re.S) |
| cmds = [] |
| for b in blocks: |
| try: |
| j = json.loads(b) |
| if isinstance(j, dict) and isinstance(j.get("commands"), list): |
| cmds = [str(c).strip() for c in j["commands"] if str(c).strip()] |
| break |
| except Exception: |
| continue |
| if cmds: |
| logs = run_commands(cmds, bool(allow_full_shell)) |
| exec_report += "\n\n[已根据AI建议执行命令]\n" + (logs[-4000:] if len(logs) > 4000 else logs) |
| mcp_ops = {} |
| for b in blocks: |
| try: |
| j = json.loads(b) |
| if isinstance(j, dict) and isinstance(j.get("mcp"), dict): |
| mcp_ops = j["mcp"] |
| break |
| except Exception: |
| continue |
| if mcp_ops: |
| msgs = [] |
| if isinstance(mcp_ops.get("install"), list): |
| for tid in mcp_ops["install"]: |
| msgs.append(f"安装 {tid}: {mcp.install(str(tid))}") |
| if isinstance(mcp_ops.get("uninstall"), list): |
| for tid in mcp_ops["uninstall"]: |
| msgs.append(f"卸载 {tid}: {mcp.uninstall(str(tid))}") |
| if isinstance(mcp_ops.get("config"), dict): |
| for tid, cfg in mcp_ops["config"].items(): |
| try: |
| msgs.append(f"配置 {tid}: {mcp.save_config(str(tid), json.dumps(cfg, ensure_ascii=False))}") |
| except Exception as e: |
| msgs.append(f"配置 {tid}: 失败 {e}") |
| if msgs: |
| exec_report += "\n\n[已执行MCP操作]\n" + "\n".join(msgs[-50:]) |
| except Exception as e: |
| exec_report += f"\n\n[执行阶段异常] {e}" |
|
|
| display_ans = str(ans) + (exec_report if exec_report else "") |
| safe_msg = html.escape(msg + ("\n\n[已附加附件上下文]" if chat_ctx else "")) |
| safe_ans = html.escape(display_ans) |
| new_hist = (hist or "") + "|||" + safe_msg + "|||" + safe_ans |
| html_render = render_chat_html(new_hist) |
| return new_hist, html_render, (display_ans if not str(ans).startswith("❗") else "") |
|
|
|
|
| def run_commands(cmds: List[str], allow_full_shell: bool, timeout: int = 900) -> str: |
| if not cmds: |
| return "ℹ️ 无可执行命令" |
| logs = [] |
| SAFE_PREFIXES = ("python -m pip", "pip ", "pip3 ", sys.executable + " -m pip") |
| for cmd in cmds: |
| try: |
| if not allow_full_shell and not cmd.startswith(SAFE_PREFIXES): |
| logs.append(f"⛔ 已拦截(需启用完全Shell): {cmd}") |
| continue |
| if allow_full_shell: |
| p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| else: |
| parts = shlex.split(cmd) |
| p = subprocess.Popen(parts, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| _register_proc(p) |
| deadline = time.time() + timeout |
| out_lines = [] |
| err_lines = [] |
| while True: |
| if STOP_FLAG or time.time() > deadline: |
| try: |
| p.kill() |
| except Exception: |
| pass |
| break |
| line = p.stdout.readline() |
| if line: |
| out_lines.append(line) |
| el = p.stderr.readline() |
| if el: |
| err_lines.append(el) |
| if not line and not el and p.poll() is not None: |
| break |
| time.sleep(0.01) |
| rc = p.poll() |
| try: |
| o, e = p.communicate(timeout=0.2) |
| if o: |
| out_lines.append(o) |
| if e: |
| err_lines.append(e) |
| except Exception: |
| pass |
| out_txt = "".join(out_lines) |
| err_txt = "".join(err_lines) |
| if time.time() > deadline and rc is None: |
| rc = 124 |
| err_txt += "\n执行超时({}s)".format(timeout) |
| if STOP_FLAG and (rc is None or rc == -9): |
| rc = 130 |
| err_txt += "\n[已手动停止]" |
| logs.append(f"$ {cmd}\nrc={rc}\n{out_txt[-4000:]}\n{err_txt[-4000:]}") |
| except Exception as e: |
| logs.append(f"$ {cmd}\n异常: {e}") |
| return "\n\n".join(logs) |
| |
| |
| css = r""" |
| *{font-family:Inter,system-ui,-apple-system,sans-serif!important;-webkit-tap-highlight-color:transparent} |
| body{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%)} |
| .gradio-container{max-width:min(1400px,100vw)!important;margin:0 auto!important;padding:18px!important} |
| .custom-card{background:rgba(255,255,255,.98);border-radius:16px;padding:18px;box-shadow:0 8px 15px -3px rgba(0,0,0,.08);border:1px solid rgba(0,0,0,.06)} |
| .chat-container{min-height:300px;max-height:600px;overflow:auto;background:#fff;border-radius:14px;border:1px solid rgba(0,0,0,.06);padding:12px;margin-bottom:8px} |
| .user-msg,.bot-msg{display:flex;gap:10px;margin:10px 0;align-items:flex-start} |
| .msg-avatar{flex:0 0 36px;width:36px;height:36px;border-radius:50%;display:flex;align-items:center;justify-content:center;background:#e5e7eb} |
| .user-msg .msg-avatar{background:#667eea;color:#fff} |
| .bot-msg .msg-avatar{background:#f093fb;color:#fff} |
| .msg-content{flex:1;padding:10px 12px;border-radius:12px;max-width:100%;white-space:pre-wrap;word-wrap:break-word;background:#f7f7f9} |
| .chat-toolbar{gap:8px;margin-top:6px} |
| .chat-toolbar .gr-button{padding:6px 10px;border-radius:10px} |
| .chat-toolbar .gr-file{max-height:42px;overflow:hidden} |
| .tiny-files .label,.tiny-files label{font-size:12px;opacity:.8} |
| .debug-logs{font-family:monospace!important;font-size:11px;background:#0b1021;color:#aaf255;padding:10px;border-radius:8px} |
| pre, code{white-space:pre;overflow-x:auto} |
| |
| /* 顶部浮动Logo按钮 */ |
| #global-stop-btn{position:fixed;top:10px;right:10px;z-index:9999} |
| #global-stop-btn button{ |
| background:linear-gradient(135deg,#8b5cf6 0%,#06b6d4 100%); |
| color:#fff;width:36px;height:36px;min-height:36px;padding:0;border-radius:10px;line-height:36px;border:0; |
| box-shadow:0 6px 16px rgba(139,92,246,.35);font-size:18px |
| } |
| #global-stop-btn button:hover{filter:brightness(1.05)} |
| |
| /* 移动端优化 */ |
| @media (max-width: 900px){ |
| .gradio-container{max-width:100vw!important;padding:10px!important} |
| .gr-row{flex-direction:column!important} |
| .gr-column{min-width:100%!important;max-width:100%!important} |
| .custom-card{padding:12px} |
| .chat-container{max-height:60vh} |
| } |
| """ |
|
|
| js_code = r""" |
| <script> |
| window.copyText=(t)=>{if(t===undefined||t===null)return;navigator.clipboard.writeText(String(t)).then(()=>alert('已复制')).catch(()=>{const e=document.createElement('textarea');e.value=String(t);e.style.position='fixed';e.style.opacity='0';document.body.appendChild(e);e.select();document.execCommand('copy');document.body.removeChild(e);alert('已复制')})}; |
| /* viewport 注入 */ |
| (function(){const has=document.querySelector('meta[name="viewport"]');if(!has){const m=document.createElement('meta');m.name='viewport';m.content='width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no';document.head.appendChild(m);}})(); |
| /* 长按空白输入框粘贴(移动端) */ |
| (function(){ |
| let timer=null; |
| function addLongPress(el){ |
| const start=(e)=>{ if(timer)clearTimeout(timer); timer=setTimeout(async ()=>{ |
| try{ |
| if(!el.value || el.value.trim()===''){ |
| const txt=await navigator.clipboard.readText(); |
| if(txt){ el.value=txt; el.dispatchEvent(new Event('input',{bubbles:true})); } |
| else{ alert('剪贴板为空,请使用系统粘贴'); } |
| } |
| }catch(err){ alert('无法读取剪贴板,请使用系统粘贴'); } |
| },650); }; |
| const end=()=>{ if(timer)clearTimeout(timer); }; |
| el.addEventListener('touchstart',start); el.addEventListener('mousedown',start); |
| el.addEventListener('touchend',end); el.addEventListener('mouseup',end); el.addEventListener('mouseleave',end); |
| } |
| const bind=()=>{ document.querySelectorAll('textarea').forEach(addLongPress); }; |
| document.addEventListener('DOMContentLoaded',bind); |
| const obs=new MutationObserver(bind); obs.observe(document.documentElement,{childList:true,subtree:true}); |
| })(); |
| |
| /* 注入渐变 favicon(机器人) */ |
| (function(){ |
| function setFavicon(){ |
| const svg = `<svg xmlns='http://www.w3.org/2000/svg' width='64' height='64'> |
| <defs><linearGradient id='g' x1='0' y1='0' x2='1' y2='1'> |
| <stop offset='0' stop-color='#8b5cf6'/><stop offset='1' stop-color='#06b6d4'/></linearGradient></defs> |
| <rect width='64' height='64' rx='12' fill='url(#g)'/> |
| <text x='32' y='42' text-anchor='middle' font-size='34' fill='white'>🤖</text></svg>`; |
| const url='data:image/svg+xml;utf8,'+encodeURIComponent(svg); |
| const old=document.querySelectorAll('link[rel="icon"]'); old.forEach(n=>n.parentNode.removeChild(n)); |
| const link=document.createElement('link'); link.rel='icon'; link.href=url; document.head.appendChild(link); |
| } |
| if(document.readyState==='loading'){document.addEventListener('DOMContentLoaded',setFavicon);} else {setFavicon();} |
| })(); |
| </script> |
| """ |
|
|
| |
| _last = load_tasks_db().get("last", {}) |
| LAST_TASK = _last |
| LAST_FILES = [f for f in _last.get("files", []) if f and os.path.exists(f)] |
| LAST_CHAT_CODER = _last.get("chat_history", "") |
| LAST_CHAT_GENERAL = _last.get("chat_history_general", "") |
|
|
|
|
| |
| with gr.Blocks( |
| title=APP_NAME, |
| css=css, |
| theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue"), |
| ) as app: |
| gr.HTML(js_code) |
| dynamic_css_html = gr.HTML("<style id='dynamic-css'></style>") |
|
|
| |
| last_ai_response = gr.State("") |
| last_run_summary = gr.State("") |
| last_chat_payload_coder = gr.State({}) |
| last_chat_payload_general = gr.State({}) |
| selected_tool_id = gr.State("") |
| ai_cmds_state = gr.State([]) |
| history_coder_str = gr.State(LAST_CHAT_CODER) |
| history_general_str = gr.State(LAST_CHAT_GENERAL) |
|
|
| |
| with gr.Row(): |
| global_stop_btn = gr.Button("⏹", elem_id="global-stop-btn") |
| top_hint = gr.Markdown("### 🧭 团队规划可自定义:在左侧'团队规划'中开启/修改/增减成员") |
|
|
| gr.Markdown("## ⚙️ 模型与密钥配置(缓存持久化 + 可用性检测 + 选择修复)") |
|
|
| with gr.Row(): |
| with gr.Column(scale=4): |
| |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 🎨 外观与背景") |
| bg_mode = gr.Radio( |
| choices=["默认", "白色", "美化", "自定义背景"], |
| value=load_all().get("ui_background_mode", "默认"), |
| label="背景模式", |
| interactive=True, |
| ) |
| with gr.Row(): |
| bg_file = gr.File( |
| label="上传自定义背景(图片)", |
| type="filepath", |
| file_types=[".png", ".jpg", ".jpeg", ".webp"], |
| scale=3, |
| ) |
| bg_apply_btn = gr.Button("应用背景", scale=1) |
| bg_reset_btn = gr.Button("恢复默认", scale=1) |
| bg_status = gr.Textbox(label="主题状态", interactive=False) |
|
|
| |
| with gr.Group(elem_classes="custom-card"): |
| with gr.Row(): |
| provider = gr.Radio( |
| provider_choices(), |
| value=load_all().get("ui_provider", "gemini"), |
| label="提供商", |
| interactive=True, |
| ) |
| with gr.Row(): |
| cached_init = get_models_cache(load_all().get("ui_provider", "gemini")) or RECOMMENDED_MODELS.get(load_all().get("ui_provider", "gemini"), []) |
| model = gr.Dropdown( |
| choices=cached_init, |
| allow_custom_value=True, |
| label="选择或输入模型(显示全部)", |
| interactive=True, |
| value=(load_all().get("ui_model") or (cached_init[0] if cached_init else "")), |
| ) |
| copy_model_btn = gr.Button("📋 复制", scale=0, min_width=60) |
| with gr.Row(): |
| refresh_btn = gr.Button("🔄 刷新模型列表(自动保存)") |
| test_btn = gr.Button("🔌 测试连接") |
| with gr.Row(): |
| rpm_input = gr.Number( |
| label="每分钟请求上限 (RPM, 0 表示不限)", |
| value=(get_saved_rpm(load_all().get("ui_provider", "gemini"), load_all().get("ui_model", "")) if load_all().get("ui_model") else 0), |
| precision=0, |
| ) |
| save_rpm_btn = gr.Button("💾 保存当前模型 RPM") |
| model_status = gr.Textbox(label="连接与设置状态", interactive=False, lines=2) |
|
|
| with gr.Accordion("可用性批量检测", open=False): |
| check_btn = gr.Button("🧪 开始检测(自动刷新后检测)") |
| model_info = gr.Textbox(label="检测信息", interactive=False) |
| model_table = gr.Dataframe( |
| headers=["模型", "推荐", "可用"], |
| datatype=["str", "str", "str"], |
| interactive=True, |
| ) |
| tested_model_select = gr.Dropdown( |
| label="从检测结果选择模型(仅显示可用)", |
| choices=[], |
| interactive=True, |
| ) |
| apply_tested_model_btn = gr.Button("应用所选模型") |
| model_list_text = gr.Textbox( |
| label="模型全列表(仅显示可用,可复制)", |
| lines=5, |
| interactive=False, |
| ) |
| copy_list_btn = gr.Button("📋 复制全部模型名") |
|
|
| |
| team_enabled_default, team_members_default, team_pri_default, team_conc_default = load_team_from_conf() |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 👥 团队规划(多代理协作)") |
| with gr.Row(): |
| team_enable_ck = gr.Checkbox( |
| value=team_enabled_default, |
| label="启用团队规划(默认关闭,可在此开启/关闭)", |
| ) |
| team_conc = gr.Slider(1, 16, team_conc_default, step=1, label="最大并发成员数") |
| team_pri = gr.Textbox( |
| label="行动优先级(高→低)", |
| value=team_pri_default or "run>code>reflect>stop", |
| placeholder="run>code>reflect>stop", |
| ) |
| gr.Markdown("成员列表(可自定义增/删/改;provider/model 为空则使用上方规划师设置)") |
| team_df = gr.Dataframe( |
| headers=["name", "provider", "model", "persona"], |
| value=[[m["name"], m.get("provider", ""), m.get("model", ""), m.get("persona", "")] for m in (team_members_default or [])], |
| interactive=True, |
| row_count=(len(team_members_default) or 4), |
| ) |
| with gr.Row(): |
| team_add_row_btn = gr.Button("➕ 新增成员行") |
| team_load_default_btn = gr.Button("📥 载入默认四人") |
| team_save_btn = gr.Button("💾 保存团队配置") |
| team_status = gr.Textbox(label="团队状态", interactive=False) |
|
|
| |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 🔐 API 凭据(自动保存,空值不覆盖已保存)") |
| remember_ck = gr.Checkbox(value=True, label="🧠 变更时自动保存") |
| save_status = gr.Textbox(label="保存状态", interactive=False) |
|
|
| with gr.Accordion("🧩 自定义 API 提供商 (OpenAI 兼容)", open=False): |
| with gr.Row(): |
| custom_list = gr.Dropdown(label="已保存的自定义提供商", choices=list(get_custom_providers().keys()), interactive=True) |
| custom_api_name = gr.Textbox(label="自定义提供商 ID", placeholder="例如 my-custom-api") |
| custom_api_base = gr.Textbox(label="API Base URL", placeholder="https://api.example.com/v1") |
| custom_api_key = gr.Textbox(label="API Key", type="password") |
| custom_api_referer = gr.Textbox(label="HTTP-Referer (可选)") |
| custom_api_title = gr.Textbox(label="X-Title (可选)") |
| with gr.Row(): |
| save_custom_api_btn = gr.Button("💾 保存/更新") |
| delete_custom_api_btn = gr.Button("🗑 删除选中") |
| reload_providers_btn = gr.Button("🔄 刷新提供商列表") |
| custom_api_status = gr.Textbox(label="自定义API状态", interactive=False) |
|
|
| with gr.Accordion("展开/折叠所有凭据", open=False): |
| with gr.Accordion("🪄 Gemini", open=False): |
| gemini_key = gr.Textbox(type="password", label="GOOGLE_API_KEY", value=load_all().get("gemini_key", "")) |
| with gr.Accordion("🧠 OpenAI (openai_like)", open=False): |
| openai_key = gr.Textbox(type="password", label="OPENAI_API_KEY", value=load_all().get("openai_key", "")) |
| openai_base = gr.Textbox(label="OPENAI_BASE_URL", value=load_all().get("openai_base", DEFAULT_BASES["openai"])) |
| with gr.Accordion("✨ Anthropic", open=False): |
| anthropic_key = gr.Textbox(type="password", label="ANTHROPIC_API_KEY", value=load_all().get("anthropic_key", "")) |
| with gr.Accordion("🧩 Cohere", open=False): |
| cohere_key = gr.Textbox(type="password", label="COHERE_API KEY", value=load_all().get("cohere_key", "")) |
| with gr.Accordion("🚀 Groq", open=False): |
| groq_key = gr.Textbox(type="password", label="GROQ_API_KEY", value=load_all().get("groq_key", "")) |
| groq_base = gr.Textbox(label="GROQ_BASE_URL", value=load_all().get("groq_base", DEFAULT_BASES["groq"])) |
| with gr.Accordion("🌪 Mistral", open=False): |
| mistral_key = gr.Textbox(type="password", label="MISTRAL_API KEY", value=load_all().get("mistral_key", "")) |
| mistral_base = gr.Textbox(label="MISTRAL_BASE_URL", value=load_all().get("mistral_base", DEFAULT_BASES["mistral"])) |
| with gr.Accordion("🧠 DeepSeek", open=False): |
| deepseek_key = gr.Textbox(type="password", label="DEEPSEEK_API KEY", value=load_all().get("deepseek_key", "")) |
| deepseek_base = gr.Textbox(label="DEEPSEEK_BASE_URL", value=load_all().get("deepseek_base", DEFAULT_BASES["deepseek"])) |
| with gr.Accordion("🌐 OpenRouter", open=False): |
| openrouter_key = gr.Textbox(type="password", label="OPENROUTER_API KEY", value=load_all().get("openrouter_key", "")) |
| openrouter_base = gr.Textbox(label="OPENROUTER_BASE URL", value=load_all().get("openrouter_base", DEFAULT_BASES["openrouter"])) |
| openrouter_referer = gr.Textbox(label="Referer(可选)", value=load_all().get("openrouter_referer", "")) |
| openrouter_title = gr.Textbox(label="Title(可选)", value=load_all().get("openrouter_title", "")) |
| with gr.Accordion("🔎 Perplexity", open=False): |
| perplexity_key = gr.Textbox(type="password", label="PPLX_API KEY", value=load_all().get("perplexity_key", "")) |
| perplexity_base = gr.Textbox(label="PPLX_BASE_URL", value=load_all().get("perplexity_base", DEFAULT_BASES["perplexity"])) |
| with gr.Accordion("🤖 xAI", open=False): |
| xai_key = gr.Textbox(type="password", label="XAI_API KEY", value=load_all().get("xai_key", "")) |
| xai_base = gr.Textbox(label="XAI_BASE_URL", value=load_all().get("xai_base", DEFAULT_BASES["xai"])) |
| with gr.Accordion("💠 Azure OpenAI", open=False): |
| azure_key = gr.Textbox(type="password", label="AZURE_KEY", value=load_all().get("azure_key", "")) |
| azure_base = gr.Textbox(label="AZURE_BASE", value=load_all().get("azure_base", DEFAULT_BASES["azure"])) |
| azure_deployment = gr.Textbox(label="DEPLOYMENT", value=load_all().get("azure_deployment", "")) |
| azure_version = gr.Textbox(label="API_VERSION", value=load_all().get("azure_version", "2024-02-15-preview")) |
| with gr.Accordion("🤗 Hugging Face", open=False): |
| hf_token = gr.Textbox(type="password", label="HF_TOKEN", value=load_all().get("hf_token", "")) |
| with gr.Accordion("🐙 GitHub", open=False): |
| github_token = gr.Textbox(type="password", label="GitHub PAT", value=load_all().get("github_token", "")) |
| with gr.Accordion("🧪 SiliconFlow", open=False): |
| siliconflow_key = gr.Textbox(type="password", label="SILICONFLOW_API KEY", value=load_all().get("siliconflow_key", "")) |
| siliconflow_base = gr.Textbox(label="SILICONFLOW_BASE_URL", value=load_all().get("siliconflow_base", DEFAULT_BASES["siliconflow"])) |
|
|
| |
| with gr.Column(scale=8): |
| with gr.Tabs() as main_tabs: |
| |
| with gr.Tab("🛠️ 自动编程(修复引擎)"): |
| with gr.Group(elem_classes="custom-card"): |
| global_hint = gr.Textbox( |
| label="🧭 全局提示词(修改后自动永久保存)", |
| value=load_all().get("global_hint", DEFAULT_GLOBAL_HINT_STR), |
| lines=3, |
| ) |
| task_input = gr.Textbox( |
| label="🧩 任务描述", |
| lines=5, |
| placeholder="例如:修复 OKX 交易脚本下单报错;保持原有功能与结构", |
| value=LAST_TASK.get("task", ""), |
| elem_id="task_input", |
| ) |
| with gr.Row(): |
| paste_btn = gr.Button("📋 粘贴到任务") |
| attach_files = gr.Files( |
| label="📎 附件(自动保存副本至 uploads/)", |
| type="filepath", |
| file_types=None, |
| file_count="multiple", |
| elem_classes="tiny-files", |
| ) |
| with gr.Accordion("🗂 历史附件(从 uploads/ 选择)", open=False): |
| with gr.Row(): |
| hist_files_dropdown = gr.Dropdown( |
| label="选择历史附件(多选,默认最新)", |
| choices=[], |
| multiselect=True, |
| scale=6, |
| value=[], |
| ) |
| hist_files_refresh_btn = gr.Button("🔄 刷新", scale=1) |
| hist_files_apply_btn = gr.Button("✅ 使用所选", scale=1) |
| with gr.Accordion("📄 基线代码(可选)", open=False): |
| baseline_code = gr.Code( |
| label="若填入,将优先以此作为起始 main.py", |
| language="python", |
| value=LAST_TASK.get("baseline_code", ""), |
| lines=14, |
| ) |
| with gr.Row(): |
| required_kws = gr.Textbox(label="必须包含的关键词/结构(逗号分隔)", value=LAST_TASK.get("required_kws", "")) |
| expected_stdout = gr.Textbox(label="期望 stdout 片段(可选)", value=LAST_TASK.get("expected_stdout", "")) |
| cli_args = gr.Textbox(label="运行参数(可选)", value=LAST_TASK.get("cli_args", ""), placeholder="如: --symbol BTC-USDT-SWAP --size 1") |
| with gr.Row(elem_classes="chat-toolbar"): |
| max_attempts = gr.Slider(1, 10, 3, step=1, label="最大尝试次数") |
| timeout_sec = gr.Slider(5, 600, 40, step=1, label="运行超时(秒/次)") |
| run_btn = gr.Button("▶️ 开始自主修复", variant="primary") |
| stop_btn = gr.Button("⏹ 停止") |
| hist_init = get_task_history_titles() |
| with gr.Row(): |
| new_task_btn = gr.Button("🆕 新建任务") |
| save_task_btn = gr.Button("💾 保存任务") |
| restore_last_btn = gr.Button("📥 恢复最近任务") |
| history_select = gr.Dropdown(label="任务历史(最近20条)", choices=hist_init, value=(hist_init[0] if hist_init else None), interactive=True) |
| load_history_btn = gr.Button("📂 载入所选历史") |
|
|
| with gr.Accordion("💬 任务对话(独立于'智能对话')", open=True): |
| coder_chat_html = gr.HTML(render_chat_html(LAST_CHAT_CODER)) |
| with gr.Row(): |
| coder_user_in = gr.Textbox(label="", placeholder="与AI讨论任务、追加需求、追问原因…(共享自动编程附件)", lines=3, scale=6) |
| coder_chat_files = gr.Files(label="📎", type="filepath", file_types=None, file_count="multiple", scale=1, elem_classes="tiny-files") |
| coder_insert_files_btn = gr.Button("📥 插入附件内容", scale=1) |
| with gr.Row(elem_classes="chat-toolbar"): |
| coder_enable_shell = gr.Checkbox(label="允许AI执行命令", value=False) |
| coder_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) |
| coder_send_btn = gr.Button("📨 发送", variant="secondary") |
| coder_retry_btn = gr.Button("🔁 重试发送") |
| coder_stop_chat_btn = gr.Button("⏹ 停止聊天") |
| coder_send_to_coder_btn = gr.Button("➡️ 发送到编程任务") |
| coder_copy_msg_btn = gr.Button("📋 复制消息") |
| coder_copy_chat_btn = gr.Button("📋 复制会话") |
| coder_download_chat_btn = gr.Button("💾 下载会话") |
| coder_chat_status = gr.Textbox(label="对话状态", interactive=False) |
| with gr.Row(): |
| coder_download_chat_file = gr.File(label="🗎 会话TXT(任务)", interactive=False) |
| coder_chat_download_link = gr.HTML("") |
|
|
| with gr.Accordion("🧾 修复后的 main.py(默认收起)", open=False): |
| code_out = gr.Code(language="python") |
| with gr.Row(): |
| status_out = gr.Textbox(label="状态", interactive=False) |
| attempts_out = gr.Number(label="尝试次数", interactive=False) |
| with gr.Row(): |
| stdout_out = gr.Textbox(label="📤 STDOUT(最后一次)", lines=8) |
| stderr_out = gr.Textbox(label="📥 STDERR(最后一次)", lines=8) |
| with gr.Row(): |
| download_file = gr.File(label="⬇️ 下载 main.py", interactive=False) |
| download_zip = gr.File(label="⬇️ 下载运行目录 ZIP", interactive=False) |
| download_main_link = gr.HTML("") |
| download_zip_link = gr.HTML("") |
| run_folder = gr.Textbox(label="📂 本次运行目录", interactive=False) |
|
|
| with gr.Row(): |
| watch_btn = gr.Button("👀 查看实时日志") |
| stop_watch_btn = gr.Button("🛑 停止查看") |
| live_log = gr.Textbox(label="📟 实时日志(仅当前任务 STDOUT/STDERR/pip.log)", lines=20) |
|
|
| with gr.Accordion("🪵 详细调试日志(完整记录)", open=False): |
| debug_logs_display = gr.Textbox(label="完整运行日志(包含所有提示与响应、执行细节)", lines=18, interactive=False, elem_classes="debug-logs") |
| with gr.Row(): |
| refresh_debug_btn = gr.Button("🔄 刷新") |
| copy_debug_btn = gr.Button("📋 复制") |
| clear_debug_btn = gr.Button("🗑 清空") |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("### 🧾 日志管理(调试/运行)") |
| log_selector = gr.Dropdown(label="选择日志文件", choices=[], interactive=True) |
| log_info = gr.Textbox(label="文件信息", interactive=False) |
| with gr.Row(): |
| logs_refresh_btn = gr.Button("🔄 刷新列表") |
| logs_download_btn = gr.Button("⬇️ 下载所选") |
| logs_clear_btn = gr.Button("🗑 清空全部") |
| log_preview = gr.Textbox(label="📄 日志预览(尾部,可复制)", lines=12, interactive=False) |
| download_log = gr.File(label="⬇️ 日志文件", interactive=False) |
| logs_download_link = gr.HTML("") |
| with gr.Row(): |
| download_all_btn = gr.Button("📦 下载全部日志(ZIP)") |
| download_all_file = gr.File(label="⬇️ 全部日志 ZIP", interactive=False) |
| download_all_link = gr.HTML("") |
| with gr.Column(scale=1): |
| gr.Markdown("### 📦 打包运行目录") |
| pack_status = gr.Textbox(label="打包状态", interactive=False) |
| pack_btn = gr.Button("📦 重新打包") |
|
|
| |
| with gr.Tab("🧠 智能对话"): |
| chat_html = gr.HTML(render_chat_html(LAST_CHAT_GENERAL)) |
| with gr.Row(): |
| user_in = gr.Textbox(label="", placeholder="输入你的问题…(共享自动编程附件;支持多行)", lines=3, scale=6) |
| chat_files = gr.Files(label="📎", type="filepath", file_types=None, file_count="multiple", elem_classes="tiny-files") |
| insert_files_btn = gr.Button("📥 插入附件内容") |
| with gr.Row(): |
| import_coder_turn_select = gr.Dropdown(label="从编程对话选择一条引用", choices=chat_choices_from_hist(LAST_CHAT_CODER), interactive=True, scale=3) |
| import_coder_turn_preview = gr.Textbox(label="引用预览", lines=4, interactive=False, scale=5) |
| import_coder_turn_btn = gr.Button("↘️ 插入到输入框", scale=1) |
| with gr.Row(elem_classes="chat-toolbar"): |
| chat_enable_shell = gr.Checkbox(label="允许AI执行命令", value=False) |
| chat_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) |
| send_btn = gr.Button("📨 发送", variant="primary") |
| chat_retry_btn = gr.Button("🔁 重试发送") |
| stop_chat_btn = gr.Button("⏹ 停止聊天") |
| send_to_coder_btn = gr.Button("➡️ 发送到编程任务") |
| clear_btn = gr.Button("🗑 清空") |
| copy_msg_btn = gr.Button("📋 复制消息") |
| copy_chat_btn = gr.Button("📋 复制会话") |
| download_chat_btn = gr.Button("💾 下载会话") |
| send_status = gr.Textbox(label="发送状态", interactive=False) |
| with gr.Row(): |
| download_chat_file = gr.File(label="🗎 会话TXT(智能对话)", interactive=False) |
| chat_download_link = gr.HTML("") |
|
|
| with gr.Tab("🖥️ 命令执行"): |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 🖥️ 命令执行(AI 生成 + 审阅 + 执行)") |
| with gr.Row(): |
| cmd_ctx_src = gr.Radio(choices=["编程对话", "智能对话"], value="编程对话", label="上下文来源", scale=1) |
| chat_for_cmd_select = gr.Dropdown(label="选择对话轮次", choices=[], interactive=True, scale=2) |
| selected_chat_preview = gr.Textbox(label="引用预览", lines=5, interactive=False) |
| with gr.Row(): |
| ai_task = gr.Textbox(label="任务描述(自然语言)", placeholder="例如:安装 requests pydantic;清理缓存;导出日志", lines=3, scale=3) |
| with gr.Column(scale=1): |
| ai_suggest_btn = gr.Button("🤖 从描述生成命令") |
| ai_from_chat_btn = gr.Button("🧠 从所选对话生成命令") |
| ai_from_all_chat_btn = gr.Button("📚 从整段对话生成命令") |
| with gr.Row(elem_classes="chat-toolbar"): |
| ai_enable_shell = gr.Checkbox(label="允许执行命令", value=False) |
| ai_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) |
| ai_run_btn = gr.Button("▶ 执行命令", variant="primary") |
| with gr.Row(): |
| ai_cmds_json = gr.Code(label="命令 JSON(可编辑)", language="json", value="", lines=10, scale=2) |
| shell_output = gr.Textbox(label="执行输出", lines=12, interactive=False, scale=3) |
| ai_cmds_state = gr.State([]) |
| with gr.Tab("📦 依赖管理"): |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 📦 依赖管理(pip)") |
| dep_pkg_text = gr.Textbox(label="包名(可多个,空格分隔)", placeholder="如:requests pydantic", lines=2) |
| with gr.Row(): |
| dep_install_btn = gr.Button("⬇ 安装/升级") |
| dep_uninstall_btn= gr.Button("🗑 卸载") |
| dep_upgrade_btn = gr.Button("⬆ 升级(同 安装/升级)") |
| dep_freeze_btn = gr.Button("📋 列出已安装(pip list)") |
| with gr.Row(elem_classes="chat-toolbar"): |
| dep_enable_shell = gr.Checkbox(label="允许执行命令", value=False) |
| dep_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) |
| dep_out = gr.Textbox(label="输出", lines=14, interactive=False) |
| with gr.Tab("📁 项目管理"): |
| with gr.Group(elem_classes="custom-card"): |
| gr.Markdown("### 📁 GitHub 仓库管理") |
| gh_url = gr.Textbox(label="仓库 URL", placeholder="https://github.com/user/repo.git") |
| gh_pull_btn= gr.Button("⬇ 拉取/更新") |
| gh_status = gr.Textbox(label="状态", interactive=False) |
| with gr.Row(): |
| gh_refresh_btn = gr.Button("🔄 刷新项目列表") |
| projects_dd = gr.Dropdown(label="本地项目(含 .git)", choices=[], interactive=True) |
| branch_dd = gr.Dropdown(label="分支", choices=[], interactive=True) |
| with gr.Row(): |
| new_branch = gr.Textbox(label="新分支名", placeholder="feature/x") |
| branch_create_btn = gr.Button("🌿 创建/切换") |
| files_df = gr.Dataframe(headers=["文件", "大小", "修改时间"], datatype=["str","str","str"], interactive=False) |
| file_sel = gr.Dropdown(label="选择文件", choices=[], interactive=True) |
| file_view = gr.Code(label="文件内容", language="python", lines=16) |
| with gr.Row(): |
| file_save_btn = gr.Button("💾 保存文件") |
| commit_msg = gr.Textbox(label="提交信息", placeholder="update ...") |
| commit_btn = gr.Button("✅ 提交") |
|
|
| |
|
|
| |
| def _dep_make_cmds(action, pkgs): |
| pkgs = (pkgs or "").strip() |
| if action in ("install", "upgrade"): |
| if not pkgs: |
| return [f"echo 请输入要安装/升级的包名"] |
| return [f"{sys.executable} -m pip install -U {pkgs}"] |
| if action == "uninstall": |
| if not pkgs: |
| return [f"echo 请输入要卸载的包名"] |
| return [f"{sys.executable} -m pip uninstall -y {pkgs}"] |
| if action == "freeze": |
| return [f"{sys.executable} -m pip list --format=freeze"] |
| return ["echo 未知动作"] |
|
|
| dep_install_btn.click( |
| lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("install", pkgs), bool(full))), |
| inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], |
| outputs=[dep_out], |
| ) |
| dep_uninstall_btn.click( |
| lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("uninstall", pkgs), bool(full))), |
| inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], |
| outputs=[dep_out], |
| ) |
| dep_upgrade_btn.click( |
| lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("upgrade", pkgs), bool(full))), |
| inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], |
| outputs=[dep_out], |
| ) |
| dep_freeze_btn.click( |
| lambda enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("freeze", ""), bool(full))), |
| inputs=[dep_enable_shell, dep_full_shell], |
| outputs=[dep_out], |
| ) |
|
|
| |
| def _gh_refresh_projects_ui(): |
| projs = list_projects() |
| return gr.update(choices=projs, value=(projs[0] if projs else None)) |
|
|
| def _gh_refresh_branches_files(proj): |
| brs = list_branches(proj) if proj else [] |
| files = list_files_in_project(proj) if proj else [["-", "-", "-"]] |
| file_choices = [row[0] for row in files if row and row[0] not in ("-", "")] |
| return (gr.update(choices=brs, value=(brs[0] if brs else None)), |
| files, |
| gr.update(choices=file_choices, value=(file_choices[0] if file_choices else None)), |
| "") |
|
|
| gh_refresh_btn.click(_gh_refresh_projects_ui, outputs=[projects_dd]) |
|
|
| projects_dd.change(_gh_refresh_branches_files, inputs=[projects_dd], |
| outputs=[branch_dd, files_df, file_sel, file_view]) |
|
|
| branch_dd.change( |
| lambda proj, br: create_and_switch_branch(proj, br)[0], |
| inputs=[projects_dd, branch_dd], |
| outputs=[gh_status], |
| ) |
|
|
| gh_pull_btn.click( |
| lambda url, token: (pull_github_repo(url, token),), |
| inputs=[gh_url, github_token], |
| outputs=[gh_status], |
| ).then(_gh_refresh_projects_ui, outputs=[projects_dd]) |
|
|
| file_sel.change(lambda proj, rel: read_file_content(proj, rel), inputs=[projects_dd, file_sel], outputs=[file_view]) |
| file_save_btn.click(lambda proj, rel, txt: save_file_content(proj, rel, txt), inputs=[projects_dd, file_sel, file_view], outputs=[gh_status]) |
| commit_btn.click(lambda proj, msg: commit_changes(proj, msg), inputs=[projects_dd, commit_msg], outputs=[gh_status]) |
|
|
| |
|
|
| |
| def update_run_links( |
| code_out, status_out, attempts_out, stdout_out, stderr_out, |
| download_file_val, download_zip_val, run_folder_val, debug_logs_display |
| ): |
| main_link_html = build_file_link_html(download_file_val, "下载 main.py") if download_file_val else "<i>(无 main.py)</i>" |
| zip_link_html = build_file_link_html(download_zip_val, "下载运行目录 ZIP") if download_zip_val else "<i>(无 ZIP)</i>" |
| return main_link_html, zip_link_html |
|
|
| def gather_keys_func(*kv): |
| keys_map = [ |
| "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", |
| "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", |
| "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", |
| "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", |
| "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" |
| ] |
| return {keys_map[i]: v for i, v in enumerate(kv) if i < len(keys_map)} |
|
|
| |
| |
| try: |
| from gradio.context import Context as _GrCx |
| _GrCx.root_block = app |
| except: |
| pass |
|
|
| global_stop_btn.click(lambda: stop_all()).then( |
| None, None, None, |
| js="(...args) => { console.log('Stop button clicked', args); return args; }" |
| ) |
|
|
| |
| def apply_background(mode, f): |
| img_path = "" |
| if mode == "自定义背景" and f: |
| persisted = persist_files_to_uploads([resolve_file_path(f)]) |
| img_path = persisted[0] if persisted else "" |
| css_txt = build_theme_css(mode, img_path) |
| set_ui_theme(mode, img_path) |
| return ( |
| f"✅ 已应用背景:{mode}" + (f"({Path(img_path).name})" if img_path else ""), |
| f"<style id='dynamic-css'>{css_txt}</style>", |
| ) |
|
|
| def reset_background(): |
| css_txt = build_theme_css("默认", "") |
| set_ui_theme("默认", "") |
| return "✅ 已恢复默认背景", f"<style id='dynamic-css'>{css_txt}</style>" |
|
|
| bg_apply_btn.click(apply_background, inputs=[bg_mode, bg_file], outputs=[bg_status, dynamic_css_html]) |
| bg_reset_btn.click(reset_background, outputs=[bg_status, dynamic_css_html]) |
|
|
| |
| def ui_refresh_provider_and_list(): |
| choices = provider_choices() |
| data = load_all() |
| cur = data.get("ui_provider", "gemini") |
| val = cur if cur in choices else (choices[0] if choices else "gemini") |
| clist = list(get_custom_providers().keys()) |
| debug("UI", "刷新提供商+自定义列表", {"providers": len(choices), "customs": len(clist)}) |
| return gr.update(choices=choices, value=val), gr.update(choices=clist, value=(clist[0] if clist else None)) |
|
|
| def ui_save_custom_api(name, base, key, ref, title): |
| msg = save_custom_provider(name, base, key, referer=ref, title=title) |
| p_upd, list_upd = ui_refresh_provider_and_list() |
| try: |
| if name and isinstance(p_upd, dict) and name in (p_upd.get("choices") or []): |
| p_upd = gr.update(choices=p_upd.get("choices"), value=name) |
| except Exception: |
| pass |
| return msg, list_upd, p_upd |
|
|
| def ui_delete_custom_api(sel): |
| msg = remove_custom_provider(sel) |
| p_upd, list_upd = ui_refresh_provider_and_list() |
| return msg, list_upd, p_upd |
|
|
| def ui_load_custom_fields(sel): |
| c = get_custom_providers().get(sel or "", {}) |
| return (sel or "", c.get("base", ""), c.get("key", ""), c.get("referer", ""), c.get("title", "")) |
|
|
| save_custom_api_btn.click( |
| ui_save_custom_api, |
| inputs=[custom_api_name, custom_api_base, custom_api_key, custom_api_referer, custom_api_title], |
| outputs=[custom_api_status, custom_list, provider], |
| ) |
| delete_custom_api_btn.click(ui_delete_custom_api, inputs=[custom_list], outputs=[custom_api_status, custom_list, provider]) |
| reload_providers_btn.click(ui_refresh_provider_and_list, outputs=[provider, custom_list]) |
| custom_list.change( |
| ui_load_custom_fields, |
| inputs=[custom_list], |
| outputs=[custom_api_name, custom_api_base, custom_api_key, custom_api_referer, custom_api_title], |
| ) |
|
|
| |
| all_key_inputs = [ |
| gemini_key, openai_key, openai_base, anthropic_key, cohere_key, groq_key, groq_base, mistral_key, mistral_base, |
| deepseek_key, deepseek_base, openrouter_key, openrouter_base, openrouter_referer, openrouter_title, perplexity_key, |
| perplexity_base, xai_key, xai_base, azure_key, azure_base, azure_deployment, azure_version, hf_token, github_token, |
| siliconflow_key, siliconflow_base, |
| ] |
| all_inputs_for_save = [remember_ck, provider, model, github_token, global_hint, task_input] + all_key_inputs |
|
|
| def on_any_change_proxy(remember, pvd, mdl, token, hint, task, *kv): |
| debug("UI", "on_change 触发", {"remember": bool(remember), "pvd": pvd, "mdl": mdl}) |
| if not remember: |
| return "" |
| keys = gather_keys_func(*kv) |
| return auto_save_ui(keys, pvd, mdl, token, hint, last_task=task) |
|
|
| |
| def on_any_change_proxy(remember, pvd, mdl, token, hint, task, *kv): |
| debug("UI", "on_change 触发", {"remember": bool(remember), "pvd": pvd, "mdl": mdl}) |
| if not remember: |
| return "" |
| keys = gather_keys_func(*kv) |
| return auto_save_ui(keys, pvd, mdl, token, hint, last_task=task) |
|
|
| all_inputs_for_save = [remember_ck, provider, model, github_token, global_hint, task_input] + all_key_inputs |
|
|
| for comp in all_inputs_for_save: |
| comp.change(on_any_change_proxy, inputs=all_inputs_for_save, outputs=save_status) |
|
|
|
|
| for comp in all_inputs_for_save: |
| comp.change(on_any_change_proxy, inputs=all_inputs_for_save, outputs=save_status) |
|
|
| def save_hint_now(hint, pvd, mdl, token, task, *kv): |
| debug("UI", "保存全局提示词", {"len": len(hint or "")}) |
| keys = gather_keys_func(*kv) |
| return save_all_ui(keys, pvd, mdl, token, hint, last_task=task, auto=True) |
|
|
| global_hint.change( |
| save_hint_now, |
| inputs=[global_hint, provider, model, github_token, task_input] + all_key_inputs, |
| outputs=[save_status], |
| ) |
|
|
| |
| def recover_provider_model(): |
| data = load_all() |
| pvd = data.get("ui_provider", "gemini") |
| models = get_models_cache(pvd) or RECOMMENDED_MODELS.get(pvd, []) |
| mdl_saved = data.get("ui_model", "") |
| mdl_val = mdl_saved if mdl_saved in models else (models[0] if models else "") |
| return gr.update(choices=provider_choices(), value=pvd), gr.update(choices=models, value=mdl_val) |
|
|
| app.load(recover_provider_model, outputs=[provider, model]) |
|
|
| def on_provider_change(pvd, cur_m): |
| choices = get_models_cache(pvd) or RECOMMENDED_MODELS.get(pvd, []) |
| data = load_all() |
| saved_mdl = data.get("ui_model", "") |
| value = saved_mdl if saved_mdl in choices else (cur_m if cur_m in choices else (choices[0] if choices else "")) |
| debug("UI", "提供商切换", {"provider": pvd, "model_value": value}) |
| return gr.update(choices=choices, value=value), gr.update(choices=choices, value=value) |
|
|
| provider.change(on_provider_change, inputs=[provider, model], outputs=[model, tested_model_select]) |
|
|
| def ui_refresh_models(pvd, mdl, *kv): |
| mdl_update, info, table, list_text = refresh_models(pvd, mdl, gather_keys_func(*kv)) |
| models_cached = get_models_cache(pvd) |
| tested_dd = gr.update( |
| choices=models_cached, |
| value=(mdl if mdl in models_cached else (models_cached[0] if models_cached else "")), |
| ) |
| return mdl_update, info, table, list_text, tested_dd |
|
|
| refresh_btn.click( |
| ui_refresh_models, |
| inputs=[provider, model] + all_key_inputs, |
| outputs=[model, model_info, model_table, model_list_text, tested_model_select], |
| ) |
|
|
| def load_rpm_for_current(pvd, mdl): |
| return gr.update(value=int(get_saved_rpm(pvd, mdl) if mdl else 0)) |
|
|
| def save_current_rpm(pvd, mdl, rpm): |
| if not (pvd and mdl): |
| return "❗ 请先选择提供商与模型" |
| set_saved_rpm(pvd, mdl, int(max(0, rpm or 0))) |
| return f"✅ 已保存 {pvd}:{mdl} 的 RPM = {int(max(0, rpm or 0))}" |
|
|
| model.change(load_rpm_for_current, inputs=[provider, model], outputs=[rpm_input]) |
| save_rpm_btn.click(save_current_rpm, inputs=[provider, model, rpm_input], outputs=[model_status]) |
|
|
| def run_detection_with_refresh(pvd, *kv): |
| models, rec, err = get_models_list(pvd, gather_keys_func(*kv)) |
| if models: |
| set_models_cache(pvd, models) |
| data = load_all() |
| saved_mdl = data.get("ui_model", "") |
| mdl_val = saved_mdl if saved_mdl in models else (models[0] if models else "") |
| mdl_update = gr.update(choices=models, value=mdl_val) |
| rows, info, ok_models = test_models(pvd, gather_keys_func(*kv)) |
| ok_choices = ok_models |
| ok_value = ok_choices[0] if ok_choices else None |
| ok_list_text = "\n".join(ok_choices) |
| return (mdl_update, rows, info, gr.update(choices=ok_choices, value=ok_value), ok_list_text) |
|
|
| check_btn.click( |
| run_detection_with_refresh, |
| inputs=[provider] + all_key_inputs, |
| outputs=[model, model_table, model_info, tested_model_select, model_list_text], |
| ) |
|
|
| def on_model_df_select(evt: gr.SelectData, table, pvd, token, hint, task, *kv): |
| try: |
| ridx = evt.index[0] |
| m = table[ridx][0] |
| if m and m not in {"-", ""}: |
| debug("MODELS", "表格选择模型", {"model": m}) |
| save_all_ui(gather_keys_func(*kv), pvd, m, token, hint, last_task=task, auto=True) |
| return gr.update(value=m), f"✅ 已选: {m}" |
| except Exception as e: |
| return gr.update(), f"❌ 选择失败: {e}" |
| return gr.update(), "" |
|
|
| model_table.select( |
| on_model_df_select, |
| inputs=[model_table, provider, github_token, global_hint, task_input] + all_key_inputs, |
| outputs=[model, model_status], |
| ) |
|
|
| apply_tested_model_btn.click( |
| lambda m: ((gr.update(value=m), f"✅ 已应用模型: {m}") if m else (gr.update(), "❗ 请选择模型")), |
| inputs=[tested_model_select], |
| outputs=[model, model_status], |
| ) |
|
|
| test_btn.click( |
| lambda pvd, mdl, *kv: ( |
| "✅ 连接成功!" |
| if not str(llm_call(pvd, mdl, "请仅回复:pong", gather_keys_func(*kv), req_timeout=30)).startswith("❗") |
| else str(llm_call(pvd, mdl, "请仅回复:pong", gather_keys_func(*kv), req_timeout=30)) |
| ), |
| inputs=[provider, model] + all_key_inputs, |
| outputs=[model_status], |
| ) |
|
|
| |
| copy_model_btn.click(None, inputs=[model], js="(m)=>window.copyText(String(m||''))") |
| copy_list_btn.click(None, inputs=[model_list_text], js="(t)=>window.copyText(String(t||''))") |
| paste_btn.click( |
| None, None, |
| js=""" |
| ()=>{navigator.clipboard.readText().then(t=>{ |
| const ta=document.querySelector('#task_input textarea'); |
| if(ta){ta.value=(ta.value||'')+t;ta.dispatchEvent(new Event('input',{bubbles:true}))} |
| })} |
| """ |
| ) |
|
|
| |
| team_add_row_btn.click(lambda rows: (rows or []) + [["", "", "", ""]], inputs=[team_df], outputs=[team_df]) |
| team_load_default_btn.click(lambda: [[m["name"], m.get("provider", ""), m.get("model", ""), m.get("persona", "")] for m in DEFAULT_TEAM], outputs=[team_df]) |
|
|
| def save_team_cfg(enabled, df_rows, pri, conc): |
| msg = save_team_to_conf(bool(enabled), df_rows, pri, int(conc)) |
| return msg |
|
|
| team_save_btn.click(save_team_cfg, inputs=[team_enable_ck, team_df, team_pri, team_conc], outputs=[team_status]) |
|
|
| |
| def list_uploads_files_ui(limit=200) -> List[str]: |
| try: |
| items = [] |
| for p in sorted(Path(UPLOADS_DIR).glob("*"), key=lambda x: x.stat().st_mtime, reverse=True): |
| if p.is_file(): |
| items.append(str(p)) |
| return items[:limit] |
| except Exception as e: |
| debug("UPLOADS", "列出失败", {"err": str(e)}) |
| return [] |
|
|
| def refresh_hist_files(): |
| choices = list_uploads_files_ui() |
| default_val = [choices[0]] if choices else [] |
| return gr.update(choices=choices, value=default_val) |
|
|
| hist_files_refresh_btn.click(refresh_hist_files, outputs=[hist_files_dropdown]) |
|
|
| def apply_hist_files(selected, task, base, req, exp, cli, pvd, mdl): |
| sel = [resolve_file_path(p) for p in (selected or []) if resolve_file_path(p)] |
| msg = save_task_state(task, sel, base, req, exp, cli, pvd, mdl) |
| titles = get_task_history_titles() |
| return (gr.update(value=sel), msg, gr.update(choices=titles, value=(titles[0] if titles else None))) |
|
|
| hist_files_apply_btn.click( |
| apply_hist_files, |
| inputs=[hist_files_dropdown, task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model], |
| outputs=[attach_files, save_status, history_select], |
| ) |
|
|
| def new_task(): |
| return "", gr.update(value=[]), "", "", "", "", "✅ 已新建任务草稿(未保存)" |
|
|
| new_task_btn.click( |
| new_task, |
| outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status], |
| ) |
|
|
| def on_files_change(files, task, base, req, exp, cli, pvd, mdl): |
| debug("UI", "附件变更", {"count": len(files or [])}) |
| saved = persist_files_to_uploads([resolve_file_path(p) for p in (files or []) if resolve_file_path(p)]) |
| msg = save_task_state(task, saved, base, req, exp, cli, pvd, mdl) |
| titles = get_task_history_titles() |
| return (gr.update(value=saved), msg, gr.update(choices=titles, value=(titles[0] if titles else None))) |
|
|
| attach_files.change( |
| on_files_change, |
| inputs=[attach_files, task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model], |
| outputs=[attach_files, save_status, history_select], |
| ) |
|
|
| def auto_save_task(task, files, base, req, exp, cli, pvd, mdl): |
| saved = [resolve_file_path(p) for p in (files or []) if resolve_file_path(p)] |
| msg = save_task_state(task, saved, base, req, exp, cli, pvd, mdl) |
| titles = get_task_history_titles() |
| return msg, gr.update(choices=titles, value=(titles[0] if titles else None)) |
|
|
| for comp in [task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model]: |
| comp.change( |
| auto_save_task, |
| inputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, provider, model], |
| outputs=[save_status, history_select], |
| ) |
|
|
| def do_restore_last(): |
| data = restore_last_task_state() |
| coder_hist = data.get("chat_history", "") |
| general_hist = data.get("chat_history_general", "") |
| return ( |
| data.get("task", ""), |
| gr.update(value=data.get("files", [])), |
| data.get("baseline_code", ""), |
| data.get("required_kws", ""), |
| data.get("expected_stdout", ""), |
| data.get("cli_args", ""), |
| "✅ 已恢复最近任务", |
| coder_hist, render_chat_html(coder_hist), |
| general_hist, render_chat_html(general_hist), |
| ) |
|
|
| restore_last_btn.click( |
| do_restore_last, |
| outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status, |
| history_coder_str, coder_chat_html, history_general_str, chat_html], |
| ) |
|
|
| def do_load_history(title): |
| titles = get_task_history_titles() |
| if not titles: |
| return (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
| "ℹ️ 暂无历史", history_coder_str.value, render_chat_html(history_coder_str.value), |
| history_general_str.value, render_chat_html(history_general_str.value)) |
| try: |
| idx = int(title.split("|")[0].strip()) - 1 if title else 0 |
| except Exception: |
| idx = 0 |
| data = get_task_history_by_index(idx) |
| coder_hist = data.get("chat_history", "") |
| general_hist = data.get("chat_history_general", "") |
| return ( |
| data.get("task", ""), |
| gr.update(value=data.get("files", [])), |
| data.get("baseline_code", ""), |
| data.get("required_kws", ""), |
| data.get("expected_stdout", ""), |
| data.get("cli_args", ""), |
| "✅ 已载入历史", |
| coder_hist, render_chat_html(coder_hist), |
| general_hist, render_chat_html(general_hist), |
| ) |
|
|
| load_history_btn.click( |
| do_load_history, |
| inputs=[history_select], |
| outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status, |
| history_coder_str, coder_chat_html, history_general_str, chat_html], |
| ) |
|
|
| |
| def insert_files_into_message(paths, prev_text): |
| blocks = [] |
| for p in paths or []: |
| pp = resolve_file_path(p) |
| if not pp: |
| continue |
| try: |
| name = Path(pp).name |
| if Path(pp).suffix.lower() not in TEXT_EXTS and not looks_text(pp): |
| blocks.append(f"文件: {name}\n<非文本/二进制文件,已记录路径:{pp}>") |
| continue |
| b = Path(pp).read_bytes()[:200_000] |
| t = b.decode("utf-8", "replace") |
| lang = "python" if name.endswith(".py") else "" |
| blocks.append(f"文件: {name}\n```{lang}\n{t}\n```") |
| except Exception as e: |
| blocks.append(f"文件: {Path(pp).name}\n<读取失败: {e}>") |
| new_text = prev_text or "" |
| if blocks: |
| new_text = (new_text + ("\n\n" if new_text else "") + "\n\n".join(blocks)).strip() |
| return gr.update(value=new_text) |
|
|
| insert_files_btn.click(insert_files_into_message, inputs=[chat_files, user_in], outputs=[user_in]) |
| coder_insert_files_btn.click(insert_files_into_message, inputs=[coder_chat_files, coder_user_in], outputs=[coder_user_in]) |
|
|
| coder_send_btn.click( |
| chat_send_common, |
| inputs=[history_coder_str, coder_user_in, coder_chat_files, attach_files, provider, model, last_run_summary, |
| task_input, required_kws, expected_stdout, coder_enable_shell, coder_full_shell] + all_key_inputs, |
| outputs=[history_coder_str, coder_chat_html, last_ai_response, last_chat_payload_coder], |
| ).then( |
| lambda h: persist_chat_history(h), |
| inputs=[history_coder_str], |
| outputs=[coder_chat_status], |
| ).then(lambda: "", outputs=[coder_user_in]) |
|
|
| coder_retry_btn.click( |
| chat_retry_common, |
| inputs=[last_chat_payload_coder, history_coder_str, attach_files, provider, model, last_run_summary, |
| task_input, required_kws, expected_stdout, coder_enable_shell, coder_full_shell] + all_key_inputs, |
| outputs=[history_coder_str, coder_chat_html, last_ai_response], |
| ).then(lambda h: persist_chat_history(h), inputs=[history_coder_str], outputs=[coder_chat_status]) |
|
|
| coder_stop_chat_btn.click(lambda: "⏹ 已请求停止聊天(当前请求无法中断,将尽快返回)", outputs=[coder_chat_status]) |
| coder_copy_msg_btn.click(None, inputs=[coder_user_in], js="(t)=>window.copyText(String(t||''))") |
| coder_copy_chat_btn.click( |
| None, |
| inputs=[history_coder_str], |
| js=""" |
| (h)=>{ |
| const parts=String(h||'').split('|||'); let lines=[]; |
| for(let i=1;i<parts.length;i+=2){ |
| const u=parts[i]||'', a=parts[i+1]||''; |
| lines.push(`【用户】\\n${u}\\n\\n【AI】\\n${a}`); lines.push('-'.repeat(60)); |
| } |
| window.copyText(lines.join('\\n')); |
| } |
| """, |
| ) |
|
|
| def export_chat(hist: str, label: str): |
| txt = plain_chat_text(hist) |
| ts = datetime.now().strftime("%Y%m%d-%H%M%S") |
| fp = Path(LOGS_DIR) / f"chat-{label}-{ts}.txt" |
| try: |
| fp.write_text(txt, "utf-8") |
| return str(fp), build_file_link_html(str(fp), f"下载会话({label})") |
| except Exception as e: |
| return None, f"<i>导出失败: {e}</i>" |
|
|
| coder_download_chat_btn.click( |
| lambda h: export_chat(h, "任务对话"), |
| inputs=[history_coder_str], |
| outputs=[coder_download_chat_file, coder_chat_download_link], |
| ) |
|
|
| |
| send_btn.click( |
| chat_send_common, |
| inputs=[history_general_str, user_in, chat_files, attach_files, provider, model, last_run_summary, |
| task_input, required_kws, expected_stdout, chat_enable_shell, chat_full_shell] + all_key_inputs, |
| outputs=[history_general_str, chat_html, last_ai_response, last_chat_payload_general], |
| ).then(lambda h: persist_general_chat_history(h), inputs=[history_general_str], outputs=[send_status]).then(lambda: "", outputs=[user_in]) |
|
|
| chat_retry_btn.click( |
| chat_retry_common, |
| inputs=[last_chat_payload_general, history_general_str, attach_files, provider, model, last_run_summary, |
| task_input, required_kws, expected_stdout, chat_enable_shell, chat_full_shell] + all_key_inputs, |
| outputs=[history_general_str, chat_html, last_ai_response], |
| ).then(lambda h: persist_general_chat_history(h), inputs=[history_general_str], outputs=[send_status]) |
|
|
| stop_chat_btn.click(lambda: "⏹ 已请求停止聊天(当前请求无法中断,将尽快返回)", outputs=[send_status]) |
|
|
| def refresh_coder_turn_choices(hist: str): |
| choices = chat_choices_from_hist(hist) |
| default = choices[0] if choices else None |
| preview = (preview_chat_by_choice(hist, default) if default else "(没有编程对话历史)") |
| return gr.update(choices=choices, value=default), preview |
|
|
| app.load(lambda: refresh_coder_turn_choices(LAST_CHAT_CODER), outputs=[import_coder_turn_select, import_coder_turn_preview]) |
| history_coder_str.change(refresh_coder_turn_choices, inputs=[history_coder_str], outputs=[import_coder_turn_select, import_coder_turn_preview]) |
| import_coder_turn_select.change( |
| lambda h, c: preview_chat_by_choice(h, c), |
| inputs=[history_coder_str, import_coder_turn_select], |
| outputs=[import_coder_turn_preview], |
| ) |
| import_coder_turn_btn.click( |
| lambda prev, txt: gr.update(value=((prev or "") + ("\n\n【引用自编程对话】\n" + txt) if txt else "")), |
| inputs=[user_in, import_coder_turn_preview], |
| outputs=[user_in], |
| ) |
|
|
| copy_msg_btn.click(None, inputs=[user_in], js="(t)=>window.copyText(String(t||''))") |
| copy_chat_btn.click( |
| None, |
| inputs=[history_general_str], |
| js=""" |
| (h)=>{ |
| const parts=String(h||'').split('|||'); let lines=[]; |
| for(let i=1;i<parts.length;i+=2){ |
| const u=parts[i]||'', a=parts[i+1]||''; |
| lines.push(`【用户】\\n${u}\\n\\n【AI】\\n${a}`); lines.push('-'.repeat(60)); |
| } |
| window.copyText(lines.join('\\n')); |
| } |
| """, |
| ) |
| clear_btn.click(lambda: ("", render_chat_html(""), "", "🧹 已清空智能对话"), outputs=[history_general_str, chat_html, last_ai_response, send_status]) |
| download_chat_btn.click(lambda h: export_chat(h, "智能对话"), inputs=[history_general_str], outputs=[download_chat_file, chat_download_link]) |
|
|
| send_to_coder_btn.click( |
| lambda last, cur: ( |
| gr.update(value=(cur + ("\n\n=== 来自智能对话的需求 ===\n" + last) if cur else ("基于以下对话内容生成代码:\n\n" + last))), |
| ("✅ 已追加到编程任务" if cur else "✅ 已设置为编程任务"), |
| ), |
| inputs=[last_ai_response, task_input], |
| outputs=[task_input, send_status], |
| ) |
| coder_send_to_coder_btn.click( |
| lambda last, cur: ( |
| gr.update(value=(cur + ("\n\n=== 来自对话的需求 ===\n" + last) if cur else ("基于以下对话内容生成代码:\n\n" + last))), |
| ("✅ 已追加到编程任务" if cur else "✅ 已设置为编程任务"), |
| ), |
| inputs=[last_ai_response, task_input], |
| outputs=[task_input, coder_chat_status], |
| ) |
|
|
| |
| def to_member_dicts(rows): |
| out = [] |
| for r in rows or []: |
| if not isinstance(r, list) or len(r) < 4: |
| continue |
| name, pvd, mdl, persona = ((r[0] or "").strip(), (r[1] or "").strip(), (r[2] or "").strip(), (r[3] or "").strip()) |
| if not name: |
| continue |
| out.append({"name": name, "provider": pvd, "model": mdl, "persona": persona}) |
| return out or DEFAULT_TEAM |
|
|
| def do_run( |
| pvd, mdl, task_text, files, ma, ts, req, exp, cli, base_code, hint, |
| team_enabled, team_rows, team_pri_txt, team_conc_val, *kv, |
| ): |
| keys = gather_keys_func(*kv) |
| save_task_state( |
| task_text, |
| [resolve_file_path(p) for p in (files or []) if resolve_file_path(p)], |
| base_code, req, exp, cli, pvd, mdl, add_history=True, |
| ) |
| res = orchestrate( |
| task=task_text, global_hint=hint, provider=pvd, model=mdl, keys=keys, files=files, |
| max_attempts=int(ma), timeout=int(ts), required_kws_text=req, expected_out_contains=exp, |
| cli_args=cli, baseline_code=base_code, |
| team_enabled=bool(team_enabled), |
| team_members=to_member_dicts(team_rows), |
| team_priority=(team_pri_txt or "run>code>reflect>stop"), |
| team_max_conc=int(max(1, min(16, team_conc_val or 4))), |
| ) |
| code_path = res.get("download_main") or "" |
| zip_path = res.get("zip_path") or "" |
| return ( |
| res.get("code", ""), |
| res.get("status", ""), |
| res.get("attempts", 0), |
| res.get("stdout", ""), |
| res.get("stderr", ""), |
| gr.update(value=code_path if code_path and os.path.exists(code_path) else None), |
| gr.update(value=zip_path if zip_path and os.path.exists(zip_path) else None), |
| res.get("workdir", ""), |
| res.get("logs", ""), |
| ) |
|
|
| run_evt = run_btn.click( |
| do_run, |
| inputs=[provider, model, task_input, attach_files, max_attempts, timeout_sec, required_kws, expected_stdout, |
| cli_args, baseline_code, global_hint, team_enable_ck, team_df, team_pri, team_conc] + all_key_inputs, |
| outputs=[code_out, status_out, attempts_out, stdout_out, stderr_out, download_file, download_zip, run_folder, debug_logs_display], |
| ) |
|
|
| run_evt.then( |
| update_run_links, |
| inputs=[code_out, status_out, attempts_out, stdout_out, stderr_out, download_file, download_zip, run_folder, debug_logs_display], |
| outputs=[download_main_link, download_zip_link], |
| ) |
|
|
| run_evt.then( |
| lambda status, attempts, stdout, stderr, run_dir, code: ( |
| (lambda out_snip, err_snip, run_name: |
| f"状态: {status} | 尝试: {attempts}\n运行目录: {run_name}\n--- STDOUT 片段 ---\n{out_snip}\n--- STDERR 片段 ---\n{err_snip}")( |
| (stdout or "").strip()[:400] + ("…" if len((stdout or "").strip()) > 400 else ""), |
| (stderr or "").strip()[:400] + ("…" if len((stderr or "").strip()) > 400 else ""), |
| (Path(run_dir).name if (run_dir or "").strip() else "(未生成)"), |
| ) |
| ), |
| inputs=[status_out, attempts_out, stdout_out, stderr_out, run_folder, code_out], |
| outputs=[last_run_summary], |
| ) |
|
|
| def stop_watch(): |
| global WATCH_STOP |
| WATCH_STOP = True |
| return "🛑 已停止查看实时日志" |
|
|
| stop_watch_btn.click(stop_watch, outputs=[status_out]) |
|
|
| def stream_logs(): |
| global WATCH_STOP |
| WATCH_STOP = False |
| last_emit = "" |
| while not WATCH_STOP: |
| stdout_t = read_tail(str(Path(CURRENT_RUN_DIR, "stdout.txt")), 800_000) if CURRENT_RUN_DIR else "" |
| stderr_t = read_tail(str(Path(CURRENT_RUN_DIR, "stderr.txt")), 800_000) if CURRENT_RUN_DIR else "" |
| pip_t = read_tail(str(Path(CURRENT_RUN_DIR, "pip.log")), 400_000) if CURRENT_RUN_DIR else "" |
| composed = [ |
| "=== [STDOUT] ===\n" + (stdout_t or "(空)"), |
| "\n=== [STDERR] ===\n" + (stderr_t or "(空)"), |
| "\n=== [pip.log] ===\n" + (pip_t or "(空)"), |
| ] |
| txt = "\n".join(composed) |
| if txt != last_emit: |
| last_emit = txt |
| yield txt |
| time.sleep(0.5) |
| yield last_emit or "(暂无日志)" |
|
|
| watch_btn.click(stream_logs, outputs=[live_log]) |
| stop_btn.click(lambda: stop_all(), outputs=[status_out]) |
|
|
| |
| refresh_debug_btn.click(lambda: get_debug_text(), outputs=[debug_logs_display]) |
| copy_debug_btn.click(None, inputs=[debug_logs_display], js="(t)=>window.copyText(String(t||''))") |
| clear_debug_btn.click(lambda: (DEBUG_BUFFER.clear() or True) and "", outputs=[debug_logs_display]) |
|
|
| def list_log_files_ui() -> List[str]: |
| files = [] |
| try: |
| for p in sorted(Path(LOGS_DIR).glob("*.log"), key=lambda x: x.stat().st_mtime, reverse=True): |
| files.append(p.name) |
| except Exception as e: |
| debug("LOGS", "列出失败", {"err": str(e)}) |
| return files |
|
|
| def get_log_info_ui(fn: str) -> str: |
| try: |
| p = Path(LOGS_DIR) / fn |
| if p.exists(): |
| size = p.stat().st_size |
| size_str = f"{size/1024:.2f}KB" if size > 1024 else f"{size}B" |
| mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime)) |
| return f"🗎 {fn} | 大小: {size_str} | 修改: {mtime}" |
| return "文件不存在" |
| except Exception as e: |
| return f"获取信息失败: {e}" |
|
|
| def download_log_file_ui(fn: str): |
| p = Path(LOGS_DIR) / fn |
| return str(p) if p.exists() else None |
|
|
| def read_log_tail_ui(fn: str, max_bytes=500_000) -> str: |
| try: |
| p = Path(LOGS_DIR) / fn |
| if not p.exists(): |
| return "文件不存在" |
| b = p.read_bytes() |
| if len(b) > max_bytes: |
| b = b[-max_bytes:] |
| return b.decode("utf-8", "replace") |
| except Exception as e: |
| return f"<读取失败: {e}>" |
|
|
| def clear_all_logs_ui(): |
| cnt = 0 |
| for p in Path(LOGS_DIR).glob("*.log"): |
| try: |
| p.unlink() |
| cnt += 1 |
| except Exception: |
| pass |
| return (f"🧹 已清空 {cnt} 个日志", list_log_files_ui(), "", None, "<i>(已清空)</i>") |
|
|
| def refresh_log_list(): |
| files = list_log_files_ui() |
| debug("LOGS", "刷新日志列表", {"count": len(files)}) |
| return gr.update(choices=files, value=(files[0] if files else None)) |
|
|
| logs_refresh_btn.click(refresh_log_list, outputs=[log_selector]) |
|
|
| def log_selector_change(fn): |
| info = get_log_info_ui(fn) |
| prev = read_log_tail_ui(fn) |
| p = download_log_file_ui(fn) |
| link = build_file_link_html(p, f"下载 {fn}") if p else "<i>(请选择日志)</i>" |
| return info, prev, gr.update(value=p) if p else gr.update(), link |
|
|
| log_selector.change(log_selector_change, inputs=[log_selector], |
| outputs=[log_info, log_preview, download_log, logs_download_link]) |
|
|
| logs_download_btn.click( |
| lambda fn: ( |
| gr.update(value=download_log_file_ui(fn)) if fn else gr.update(), |
| (build_file_link_html(download_log_file_ui(fn), f"下载 {fn}") |
| if fn and download_log_file_ui(fn) else "<i>(无文件)</i>"), |
| ), |
| inputs=[log_selector], |
| outputs=[download_log, logs_download_link], |
| ) |
| logs_clear_btn.click(clear_all_logs_ui, outputs=[status_out, log_selector, log_info, download_log, logs_download_link]) |
|
|
| download_all_btn.click( |
| lambda: (lambda z: (gr.update(value=z if z and os.path.exists(z) else None), |
| (build_file_link_html(z, "下载全部日志 ZIP") if z else "<i>(打包失败)</i>")))(package_all_logs(CURRENT_RUN_DIR)), |
| outputs=[download_all_file, download_all_link], |
| ) |
|
|
| def pack_and_show(run_dir): |
| z = package_run_dir(run_dir) |
| link = (build_file_link_html(z, "下载运行目录 ZIP") if z else "<i>(无可下载ZIP)</i>") |
| return (f"{'✅ 已打包: ' + z if z else '❌ 打包失败/无目录'}", |
| gr.update(value=z if z and os.path.exists(z) else None), link) |
|
|
| pack_btn.click(pack_and_show, inputs=[run_folder], outputs=[pack_status, download_zip, download_zip_link]) |
|
|
| try: |
| |
| def ai_suggest_commands(nl_task: str, provider: str, model: str, keys: dict) -> Tuple[str, List[str]]: |
| if not (nl_task or "").strip(): |
| return "❗ 请输入要执行的任务描述", [] |
| prompt = f"""你是DevOps助手。根据用户需求,生成在Linux环境中执行的命令列表(尽量使用python -m pip而非pip),只返回JSON: |
| {{ |
| "commands": ["python -m pip install 包1", "python -m pip install 包2"] |
| }} |
| 不要解释。用户需求:{nl_task}""" |
| out = llm_call(provider, model, prompt, keys, req_timeout=60) |
| try: |
| j = json.loads(out) |
| cmds = j.get("commands", []) |
| if not isinstance(cmds, list): |
| cmds = [] |
| return json.dumps(j, ensure_ascii=False, indent=2), cmds |
| except Exception: |
| lines = [s.strip() for s in str(out).splitlines() if s.strip() and not s.strip().startswith("#")] |
| return out, lines |
|
|
| def refresh_cmd_choices(src, coder_hist, general_hist): |
| hist = coder_hist if src == "编程对话" else general_hist |
| choices = chat_choices_from_hist(hist) |
| default = choices[0] if choices else None |
| preview = (preview_chat_by_choice(hist, default) if default else "(没有可用历史)") |
| return gr.update(choices=choices, value=default), preview |
|
|
| app.load(lambda: refresh_cmd_choices("编程对话", LAST_CHAT_CODER, LAST_CHAT_GENERAL), |
| outputs=[chat_for_cmd_select, selected_chat_preview]) |
|
|
| cmd_ctx_src.change( |
| refresh_cmd_choices, |
| inputs=[cmd_ctx_src, history_coder_str, history_general_str], |
| outputs=[chat_for_cmd_select, selected_chat_preview], |
| ) |
|
|
| chat_for_cmd_select.change( |
| lambda src, coder_hist, general_hist, choice: |
| preview_chat_by_choice(coder_hist if src == "编程对话" else general_hist, choice), |
| inputs=[cmd_ctx_src, history_coder_str, history_general_str, chat_for_cmd_select], |
| outputs=[selected_chat_preview], |
| ) |
|
|
| ai_from_chat_btn.click( |
| lambda src, coder_hist, general_hist, choice, pvd, mdl, last_sum, *kv: ( |
| (lambda hist: (lambda j, cmds: ("✅ 已基于对话生成命令(请审阅后执行)", j, cmds))( |
| *ai_suggest_commands( |
| f"""根据以下对话内容,生成用于“修复/运行/准备环境”的命令列表(Linux),请尽量使用 python -m pip 而非 pip,仅返回JSON: |
| {{ |
| "commands": ["python -m pip install 包1", "python -m pip install 包2"] |
| }} |
| 不要解释。 |
| |
| 【最近任务摘要】\n{(last_sum or '').strip() or '(无)'}\n\n{preview_chat_by_choice(hist, choice)}""", |
| pvd, mdl, gather_keys_func(*kv), |
| ) |
| ))(coder_hist if src == "编程对话" else general_hist) |
| ), |
| inputs=[cmd_ctx_src, history_coder_str, history_general_str, chat_for_cmd_select, provider, model, last_run_summary] + all_key_inputs, |
| outputs=[shell_output, ai_cmds_json, ai_cmds_state], |
| ) |
|
|
| ai_from_all_chat_btn.click( |
| lambda src, coder_hist, general_hist, pvd, mdl, last_sum, *kv: |
| (lambda content: ai_suggest_commands( |
| f"""根据整段历史对话,生成用于“修复/运行/准备环境”的命令列表(Linux),要求: |
| - 尽量使用 python -m pip 而非 pip |
| - 谨慎使用 &&、|、重定向,默认不使用,除非必须 |
| - 只返回JSON(包含 commands 数组) |
| |
| 输入对话: |
| {content} |
| |
| 最近任务摘要(可选): |
| {last_sum or '(无)'}""", |
| pvd, mdl, gather_keys_func(*kv), |
| ))("\n\n".join([f"【用户】\n{t['user']}\n\n【AI】\n{t['assistant']}" for t in parse_chat_history(coder_hist if src == "编程对话" else general_hist)])), |
| inputs=[cmd_ctx_src, history_coder_str, history_general_str, provider, model, last_run_summary] + all_key_inputs, |
| outputs=[ai_cmds_json, shell_output], |
| ) |
|
|
| ai_suggest_btn.click( |
| lambda nl, pvd, mdl, *kv: (lambda res: ("✅ 已生成命令(请审阅后执行)", res[0], res[1]))( |
| ai_suggest_commands(nl, pvd, mdl, gather_keys_func(*kv))), |
| inputs=[ai_task, provider, model] + all_key_inputs, |
| outputs=[shell_output, ai_cmds_json, ai_cmds_state], |
| ) |
|
|
| ai_run_btn.click( |
| lambda json_text, cmds, enable, full_shell: |
| ("❗ 未勾选“允许AI执行命令”" if not enable else |
| (lambda commands: run_commands(commands, bool(full_shell)))( |
| json.loads(json_text).get("commands", cmds) if (json_text or "").strip() else (cmds or []) |
| )), |
| inputs=[ai_cmds_json, ai_cmds_state, ai_enable_shell, ai_full_shell], |
| outputs=[shell_output], |
| ) |
|
|
| except NameError as e: |
| debug("UI", "命令执行面板组件未定义,相关事件未绑定", {"err": str(e)}) |
| |
| def list_log_files_loader(): |
| files = [] |
| try: |
| for p in sorted(Path(LOGS_DIR).glob("*.log"), key=lambda x: x.stat().st_mtime, reverse=True): |
| files.append(p.name) |
| except Exception as e: |
| debug("LOGS", "列出失败", {"err": str(e)}) |
| return gr.update(choices=files, value=(files[0] if files else None)) |
|
|
| app.load(list_log_files_loader, outputs=[log_selector]) |
| app.load( |
| lambda: (read_tail(str(Path(LOGS_DIR) / f"debug-{datetime.now().strftime('%Y%m%d')}.log")) if Path(LOGS_DIR).exists() else ""), |
| outputs=[debug_logs_display], |
| ) |
| app.load( |
| lambda: (lambda ch: gr.update(choices=ch, value=([ch[0]] if ch else [])))(list_uploads_files_ui()), |
| outputs=[hist_files_dropdown], |
| ) |
| app.load( |
| lambda: (render_chat_html(LAST_CHAT_CODER), render_chat_html(LAST_CHAT_GENERAL), LAST_CHAT_CODER, LAST_CHAT_GENERAL), |
| outputs=[coder_chat_html, chat_html, history_coder_str, history_general_str], |
| ) |
| app.load(ui_refresh_provider_and_list, outputs=[provider, custom_list]) |
| app.load(preload_theme_settings, outputs=[bg_mode, dynamic_css_html]) |
| app.load( |
| lambda: (lambda k=load_all(): [ |
| k.get("gemini_key", ""), k.get("openai_key", ""), k.get("openai_base", DEFAULT_BASES["openai"]), |
| k.get("anthropic_key", ""), k.get("cohere_key", ""), k.get("groq_key", ""), k.get("groq_base", DEFAULT_BASES["groq"]), |
| k.get("mistral_key", ""), k.get("mistral_base", DEFAULT_BASES["mistral"]), k.get("deepseek_key", ""), |
| k.get("deepseek_base", DEFAULT_BASES["deepseek"]), k.get("openrouter_key", ""), k.get("openrouter_base", DEFAULT_BASES["openrouter"]), |
| k.get("openrouter_referer", ""), k.get("openrouter_title", ""), k.get("perplexity_key", ""), k.get("perplexity_base", DEFAULT_BASES["perplexity"]), |
| k.get("xai_key", ""), k.get("xai_base", DEFAULT_BASES["xai"]), k.get("azure_key", ""), k.get("azure_base", DEFAULT_BASES["azure"]), |
| k.get("azure_deployment", ""), k.get("azure_version", "2024-02-15-preview"), k.get("hf_token", ""), k.get("github_token", ""), |
| k.get("siliconflow_key", ""), k.get("siliconflow_base", DEFAULT_BASES["siliconflow"]), |
| ])(), |
| outputs=all_key_inputs, |
| ) |
|
|
| |
| def _llm_call_fixed(provider: str, model: str, prompt: str, keys: dict, req_timeout: int = DEFAULT_REQ_TIMEOUT) -> str: |
| try: |
| return llm_call(provider, model, prompt, keys, req_timeout=req_timeout) |
| except Exception as e: |
| debug("LLM_ERR", "外层异常", {"err": str(e), "trace": traceback.format_exc()}) |
| return f"❗ 请求异常:{e}" |
| |
| |
|
|
| |
|
|
| |
| app.load(None, None, None, js=""" |
| () => { |
| window.addEventListener('error', (e) => { |
| console.error('Global error:', e.message, e.filename, e.lineno); |
| }); |
| console.log('Error logging enabled'); |
| } |
| """) |
|
|
| if __name__ == "__main__": |
| debug("INFO", "应用启动", {"storage_root": STORAGE_ROOT}) |
| app.queue() |
| port = int(os.getenv("PORT", "7860")) |
| try: |
| app.launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| allowed_paths=[STORAGE_ROOT, RUN_ROOT, LOGS_DIR, PROJECT_ROOT, UPLOADS_DIR], |
| ) |
| except TypeError: |
| app.launch(server_name="0.0.0.0", server_port=port) |