killer2 / debug_ui.py
JackWPP's picture
feat: 增强调试界面和角色代理逻辑,优化时间戳处理和信息校验协议
6a16262
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
BASE_DIR = Path(__file__).resolve().parent
ASSETS_DIR = BASE_DIR / "web_debug"
LOGS_DIR = Path(os.getenv("LOGS_DIR") or os.getenv("TELEMETRY_DIR") or (BASE_DIR / "logs")).resolve()
app = FastAPI(title="Werewolf Debug UI", docs_url=None, redoc_url=None)
app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets")
def _ts_value(raw: Any) -> float:
if raw is None:
return 0.0
if isinstance(raw, (int, float)):
return float(raw)
if isinstance(raw, str):
try:
return float(raw)
except Exception:
return 0.0
return 0.0
def _safe_session_path(session_id: str) -> Path:
if not session_id or "/" in session_id or "\\" in session_id:
raise HTTPException(status_code=400, detail="invalid session id")
session_path = (LOGS_DIR / session_id).resolve()
if not str(session_path).startswith(str(LOGS_DIR)):
raise HTTPException(status_code=400, detail="invalid session id")
if not session_path.exists() or not session_path.is_dir():
raise HTTPException(status_code=404, detail="session not found")
return session_path
def _iter_jsonl(path: Path) -> Iterable[Dict[str, Any]]:
try:
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
except OSError:
return
def _collect_events(session_path: Path) -> List[Dict[str, Any]]:
events: List[Dict[str, Any]] = []
for file_path in sorted(session_path.rglob("*.jsonl")):
events.extend(_iter_jsonl(file_path))
events.sort(key=lambda e: _ts_value(e.get("ts")))
return events
def _filter_after_ts(events: List[Dict[str, Any]], after_ts: Optional[float]) -> List[Dict[str, Any]]:
if after_ts is None:
return events
out = []
for e in events:
ts = _ts_value(e.get("ts"))
if ts > after_ts:
out.append(e)
return out
def _stats_from_events(events: List[Dict[str, Any]]) -> Dict[str, Any]:
roles: Dict[str, int] = {}
statuses: Dict[str, int] = {}
fallback = 0
invalid = 0
interact = 0
perceive = 0
for e in events:
role = str(e.get("role") or "")
status = str(e.get("status") or "")
if role:
roles[role] = roles.get(role, 0) + 1
if status:
statuses[status] = statuses.get(status, 0) + 1
if e.get("fallback_used"):
fallback += 1
parse_valid = e.get("parse_valid")
if parse_valid is False:
invalid += 1
kind = str(e.get("kind") or "")
if not kind:
if e.get("final_result") is not None or "final_result" in e:
kind = "interact"
elif e.get("event") is not None or "event" in e:
kind = "perceive"
if kind == "interact":
interact += 1
elif kind == "perceive":
perceive += 1
return {
"total": len(events),
"interact": interact,
"perceive": perceive,
"fallback": fallback,
"invalid": invalid,
"roles": roles,
"statuses": statuses,
}
@app.get("/", response_class=HTMLResponse)
def index() -> str:
html = (ASSETS_DIR / "index.html").read_text(encoding="utf-8")
try:
v = int(
max(
(ASSETS_DIR / "index.html").stat().st_mtime,
(ASSETS_DIR / "app.js").stat().st_mtime,
(ASSETS_DIR / "styles.css").stat().st_mtime,
)
)
except Exception:
v = 1
html = html.replace("./assets/styles.css", f"./assets/styles.css?v={v}")
html = html.replace("./assets/app.js", f"./assets/app.js?v={v}")
return html
@app.get("/api/sessions")
def list_sessions() -> JSONResponse:
sessions = []
if not LOGS_DIR.exists():
return JSONResponse({"sessions": []})
for entry in LOGS_DIR.iterdir():
if not entry.is_dir():
continue
roles = sorted([p.name for p in entry.iterdir() if p.is_dir()])
jsonl_files = list(entry.rglob("*.jsonl"))
event_count = 0
last_ts: Optional[float] = None
for file_path in jsonl_files:
for record in _iter_jsonl(file_path):
event_count += 1
ts = _ts_value(record.get("ts"))
if ts:
last_ts = ts if last_ts is None else max(last_ts, ts)
sessions.append(
{
"id": entry.name,
"roles": roles,
"events": event_count,
"updated_at": last_ts or entry.stat().st_mtime,
"files": len(jsonl_files),
}
)
sessions.sort(key=lambda s: s.get("updated_at", 0), reverse=True)
return JSONResponse({"sessions": sessions})
@app.get("/api/session/{session_id}")
def session_detail(session_id: str, limit: int = Query(default=5000, ge=1, le=50000)) -> JSONResponse:
session_path = _safe_session_path(session_id)
events = _collect_events(session_path)
if len(events) > limit:
events = events[-limit:]
stats = _stats_from_events(events)
return JSONResponse(
{
"session_id": session_id,
"events": events,
"stats": stats,
}
)
@app.get("/api/session/{session_id}/delta")
def session_delta(
session_id: str,
after_ts: Optional[float] = Query(default=None),
limit: int = Query(default=2000, ge=1, le=20000),
) -> JSONResponse:
session_path = _safe_session_path(session_id)
events = _collect_events(session_path)
events = _filter_after_ts(events, after_ts)
if len(events) > limit:
events = events[-limit:]
stats = _stats_from_events(_collect_events(session_path))
return JSONResponse(
{
"session_id": session_id,
"events": events,
"stats": stats,
}
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("DEBUG_UI_PORT", "8009"))
uvicorn.run("debug_ui:app", host="127.0.0.1", port=port, reload=False)