Spaces:
Running
Running
| """ | |
| Live HTTP audit for Gov Workflow OpenEnv API. | |
| This script calls the full 16-endpoint contract over real HTTP | |
| and writes a timestamped JSON report with pass/fail + response samples. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib import error, request | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _shorten(text: str, max_chars: int = 800) -> str: | |
| if len(text) <= max_chars: | |
| return text | |
| return text[:max_chars] + "...<truncated>" | |
| def _http_call( | |
| base_url: str, | |
| method: str, | |
| path: str, | |
| *, | |
| body: dict[str, Any] | None = None, | |
| timeout_sec: int = 30, | |
| max_sample_chars: int = 800, | |
| ) -> dict[str, Any]: | |
| url = f"{base_url.rstrip('/')}{path}" | |
| payload_bytes = None | |
| headers = {"Accept": "application/json"} | |
| if body is not None: | |
| payload_bytes = json.dumps(body).encode("utf-8") | |
| headers["Content-Type"] = "application/json" | |
| req = request.Request( | |
| url=url, | |
| data=payload_bytes, | |
| headers=headers, | |
| method=method.upper(), | |
| ) | |
| status_code = None | |
| raw_text = "" | |
| parsed_json = None | |
| err_text = None | |
| try: | |
| with request.urlopen(req, timeout=timeout_sec) as resp: | |
| status_code = int(resp.status) | |
| raw_text = resp.read().decode("utf-8", errors="replace") | |
| except error.HTTPError as exc: | |
| status_code = int(exc.code) | |
| raw_text = exc.read().decode("utf-8", errors="replace") | |
| err_text = str(exc) | |
| except Exception as exc: # network/timeout etc. | |
| err_text = str(exc) | |
| if raw_text: | |
| try: | |
| parsed_json = json.loads(raw_text) | |
| except Exception: | |
| parsed_json = None | |
| return { | |
| "method": method.upper(), | |
| "path": path, | |
| "url": url, | |
| "request_body": body, | |
| "status_code": status_code, | |
| "ok": err_text is None or status_code is not None, | |
| "error": err_text, | |
| "response_json": parsed_json, | |
| "response_text_sample": _shorten(raw_text, max_chars=max_sample_chars), | |
| } | |
| def _extract_sse_data_lines(raw_text: str) -> list[dict[str, Any]]: | |
| rows: list[dict[str, Any]] = [] | |
| for line in raw_text.splitlines(): | |
| line = line.strip() | |
| if not line.startswith("data:"): | |
| continue | |
| payload = line[len("data:") :].strip() | |
| if not payload: | |
| continue | |
| try: | |
| rows.append(json.loads(payload)) | |
| except Exception: | |
| rows.append({"raw": payload}) | |
| return rows | |
| def run_audit(base_url: str, timeout_sec: int = 30) -> dict[str, Any]: | |
| checks: list[dict[str, Any]] = [] | |
| context: dict[str, Any] = {} | |
| def add_check(name: str, call_result: dict[str, Any], expected_statuses: list[int]) -> None: | |
| status_code = call_result.get("status_code") | |
| passed = bool(status_code in expected_statuses) | |
| checks.append( | |
| { | |
| "name": name, | |
| "endpoint": f"{call_result['method']} {call_result['path']}", | |
| "expected_statuses": expected_statuses, | |
| "status_code": status_code, | |
| "passed": passed, | |
| "error": call_result.get("error"), | |
| "response_sample": call_result.get("response_json") | |
| if call_result.get("response_json") is not None | |
| else call_result.get("response_text_sample"), | |
| } | |
| ) | |
| call_result["passed"] = passed | |
| # 1) /health | |
| r = _http_call(base_url, "GET", "/health", timeout_sec=timeout_sec) | |
| add_check("health", r, [200]) | |
| # 2) /tasks | |
| r = _http_call(base_url, "GET", "/tasks", timeout_sec=timeout_sec) | |
| add_check("tasks", r, [200]) | |
| task_ids: list[str] = [] | |
| if isinstance(r.get("response_json"), list): | |
| task_ids = [str(x.get("task_id")) for x in r["response_json"] if isinstance(x, dict) and x.get("task_id")] | |
| if not task_ids: | |
| task_ids = ["district_backlog_easy", "mixed_urgency_medium", "cross_department_hard"] | |
| context["task_ids"] = task_ids | |
| # 3) /tasks/{task_id} (test first available) | |
| task_id = task_ids[0] | |
| r = _http_call(base_url, "GET", f"/tasks/{task_id}", timeout_sec=timeout_sec) | |
| add_check("task_detail", r, [200]) | |
| # 4) /metrics | |
| r = _http_call(base_url, "GET", "/metrics", timeout_sec=timeout_sec) | |
| add_check("metrics", r, [200]) | |
| # 5) /actions/schema | |
| r = _http_call(base_url, "GET", "/actions/schema", timeout_sec=timeout_sec) | |
| add_check("actions_schema", r, [200]) | |
| # 6) /rl/models | |
| r = _http_call(base_url, "GET", "/rl/models", timeout_sec=timeout_sec) | |
| add_check("rl_models", r, [200]) | |
| # 7) /reset | |
| r = _http_call( | |
| base_url, | |
| "POST", | |
| "/reset", | |
| body={"task_id": task_id, "seed": 42}, | |
| timeout_sec=timeout_sec, | |
| ) | |
| add_check("reset", r, [200]) | |
| sid = None | |
| if isinstance(r.get("response_json"), dict): | |
| sid = r["response_json"].get("session_id") | |
| context["session_id"] = sid | |
| # 8) /action-masks | |
| if sid: | |
| r = _http_call( | |
| base_url, | |
| "POST", | |
| "/action-masks", | |
| body={"session_id": sid}, | |
| timeout_sec=timeout_sec, | |
| ) | |
| add_check("action_masks", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "action_masks", | |
| "endpoint": "POST /action-masks", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 9) /step | |
| if sid: | |
| r = _http_call( | |
| base_url, | |
| "POST", | |
| "/step", | |
| body={"session_id": sid, "action": {"action_type": "advance_time"}}, | |
| timeout_sec=timeout_sec, | |
| ) | |
| add_check("step", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "step", | |
| "endpoint": "POST /step", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 10) /state | |
| if sid: | |
| r = _http_call( | |
| base_url, | |
| "GET", | |
| f"/state?session_id={sid}&include_action_history=true", | |
| timeout_sec=timeout_sec, | |
| ) | |
| add_check("state", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "state", | |
| "endpoint": "GET /state", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 11) /simulate (SSE) | |
| r = _http_call( | |
| base_url, | |
| "POST", | |
| "/simulate", | |
| body={"task_id": task_id, "agent_mode": "baseline_policy", "max_steps": 3, "seed": 42}, | |
| timeout_sec=timeout_sec, | |
| max_sample_chars=4000, | |
| ) | |
| parsed_rows = _extract_sse_data_lines(r.get("response_text_sample", "")) | |
| has_step = any(isinstance(x, dict) and "step" in x for x in parsed_rows) | |
| has_done = any(isinstance(x, dict) and x.get("done") is True for x in parsed_rows) | |
| simulate_pass = (r.get("status_code") == 200) and has_step and has_done | |
| checks.append( | |
| { | |
| "name": "simulate_stream", | |
| "endpoint": "POST /simulate", | |
| "expected_statuses": [200], | |
| "status_code": r.get("status_code"), | |
| "passed": simulate_pass, | |
| "error": r.get("error"), | |
| "response_sample": { | |
| "sse_rows_sample": parsed_rows[:3], | |
| "has_step": has_step, | |
| "has_done": has_done, | |
| }, | |
| } | |
| ) | |
| # 12) /simulate/{session_id}/snapshot | |
| if sid: | |
| r = _http_call(base_url, "GET", f"/simulate/{sid}/snapshot", timeout_sec=timeout_sec) | |
| add_check("simulate_snapshot", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "simulate_snapshot", | |
| "endpoint": "GET /simulate/{session_id}/snapshot", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 13) /simulate/{session_id}/trace | |
| if sid: | |
| r = _http_call(base_url, "GET", f"/simulate/{sid}/trace?page=1&page_size=20", timeout_sec=timeout_sec) | |
| add_check("simulate_trace", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "simulate_trace", | |
| "endpoint": "GET /simulate/{session_id}/trace", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 14) /grade | |
| if sid: | |
| r = _http_call(base_url, "POST", "/grade", body={"session_id": sid}, timeout_sec=timeout_sec) | |
| add_check("grade", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "grade", | |
| "endpoint": "POST /grade", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| # 15) /rl/run (guardrail: missing model -> 422) | |
| r = _http_call( | |
| base_url, | |
| "POST", | |
| "/rl/run", | |
| body={ | |
| "task_id": task_id, | |
| "model_path": "results/best_model/does_not_exist", | |
| "seed": 42, | |
| "max_steps": 10, | |
| "n_episodes": 1, | |
| }, | |
| timeout_sec=timeout_sec, | |
| ) | |
| add_check("rl_run_missing_model_guardrail", r, [422]) | |
| # 16) /simulate/{session_id}/cancel | |
| if sid: | |
| r = _http_call(base_url, "POST", f"/simulate/{sid}/cancel", timeout_sec=timeout_sec) | |
| add_check("simulate_cancel", r, [200]) | |
| else: | |
| checks.append( | |
| { | |
| "name": "simulate_cancel", | |
| "endpoint": "POST /simulate/{session_id}/cancel", | |
| "expected_statuses": [200], | |
| "status_code": None, | |
| "passed": False, | |
| "error": "Skipped: no session_id from /reset", | |
| "response_sample": None, | |
| } | |
| ) | |
| total = len(checks) | |
| passed = sum(1 for c in checks if c["passed"]) | |
| failed = total - passed | |
| return { | |
| "audit_name": "gov-workflow-openenv-live-http-audit", | |
| "generated_at_utc": _now_iso(), | |
| "base_url": base_url, | |
| "summary": { | |
| "total_checks": total, | |
| "passed": passed, | |
| "failed": failed, | |
| "pass_rate": round((passed / total) * 100.0, 2) if total else 0.0, | |
| }, | |
| "context": context, | |
| "checks": checks, | |
| } | |
| def main() -> int: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--base-url", default="http://127.0.0.1:7860") | |
| parser.add_argument("--timeout-sec", type=int, default=30) | |
| parser.add_argument("--out-dir", default="reports/api_audit") | |
| args = parser.parse_args() | |
| report = run_audit(args.base_url, timeout_sec=args.timeout_sec) | |
| out_dir = Path(args.out_dir) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| stamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| out_path = out_dir / f"api_live_audit_{stamp}.json" | |
| out_path.write_text(json.dumps(report, indent=2), encoding="utf-8") | |
| print(f"Report written: {out_path}") | |
| print( | |
| f"Summary: passed={report['summary']['passed']}, " | |
| f"failed={report['summary']['failed']}, " | |
| f"pass_rate={report['summary']['pass_rate']}%" | |
| ) | |
| if report["summary"]["failed"] > 0: | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |