Spaces:
Sleeping
Sleeping
| """ | |
| Inference script for the PostMortem OpenEnv environment. | |
| Runs all 3 scenarios (easy/medium/hard) end-to-end against a containerised env, | |
| calling an LLM via the OpenAI client against a Hugging Face Router endpoint. | |
| Environment variables (all read via os.getenv): | |
| HF_TOKEN -- HuggingFace token used as the OpenAI API key (no default). | |
| API_BASE_URL -- LLM base URL (default: HF Router). | |
| MODEL_NAME -- LLM model id (default: Qwen/Qwen2.5-72B-Instruct). | |
| LOCAL_IMAGE_NAME -- optional; local Docker image tag when using from_docker_image(). | |
| Strict stdout format enforced by the grader: | |
| [START] task=<id> env=<benchmark> model=<model> | |
| [STEP] step=<n> action=<tool:args> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<0.000> rewards=<r1,r2,...> | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import textwrap | |
| from typing import List, Optional | |
| from openai import OpenAI | |
| # Support running as `python inference.py` from inside the env dir | |
| # as well as `python -m postmortem_env.inference` from its parent. | |
| try: | |
| from postmortem_env.client import PostmortemEnv | |
| from postmortem_env.models import PostmortemAction | |
| except ModuleNotFoundError: | |
| import os as _os, sys as _sys | |
| _here = _os.path.dirname(_os.path.abspath(__file__)) | |
| _sys.path.insert(0, _os.path.dirname(_here)) | |
| from postmortem_env.client import PostmortemEnv # type: ignore | |
| from postmortem_env.models import PostmortemAction # type: ignore | |
| # ---------- Config ---------- | |
| # Follows hackathon pre-submission checklist: | |
| # - Defaults are set only for API_BASE_URL and MODEL_NAME (not HF_TOKEN). | |
| # - LOCAL_IMAGE_NAME is optional, used when calling from_docker_image(). | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Optional — used only when launching the env from a local Docker image. | |
| # Per the submission checklist, defaults are set ONLY for API_BASE_URL and MODEL_NAME. | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| _DEFAULT_IMAGE_FALLBACK = "postmortem_env-env:latest" | |
| BENCHMARK = "postmortem_env" | |
| MAX_STEPS = 12 | |
| TEMPERATURE = 0.2 | |
| MAX_TOKENS = 220 | |
| SUCCESS_SCORE_THRESHOLD = 0.5 # per-task success threshold (normalized [0,1]) | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are an on-call Site Reliability Engineer responding to a live production | |
| incident. You MUST progress through 5 sub-goals in order to maximise reward: | |
| 1. ack (claim the incident) | |
| 2. scope (declare the affected services) | |
| 3. hypothesize (name the root cause) | |
| 4. mitigate (propose a concrete remediation) | |
| 5. write_status (publish a customer-facing update — ENDS the episode) | |
| Between ack and scope, issue AT MOST ONE query_logs call per service and | |
| AT MOST ONE query_metrics call per service, plus any needed query_traces. | |
| DO NOT repeat the same query. Efficient investigation is rewarded. | |
| Reply on each turn with a SINGLE JSON object on one line, nothing else: | |
| {"tool": "<verb>", "args": {...}} | |
| Valid tools and args: | |
| - {"tool": "ack", "args": {}} | |
| - {"tool": "query_logs", "args": {"service": "<name>"}} | |
| - {"tool": "query_metrics", "args": {"service": "<name>"}} | |
| - {"tool": "query_traces", "args": {"trace_id": "<id>"}} | |
| - {"tool": "scope", "args": {"services": ["<name>", ...]}} | |
| - {"tool": "hypothesize", "args": {"root_cause": "<short sentence with concrete keywords>"}} | |
| - {"tool": "mitigate", "args": {"action": "<short sentence with a concrete fix>"}} | |
| - {"tool": "write_status", "args": {"text": "<concise update naming service, cause, and fix>"}} | |
| When you hypothesize/mitigate/write_status, INCLUDE the specific technical | |
| keywords you observed in the logs (e.g. 'OOM', 'heap', 'DNS resolver', | |
| 'connection pool'), because the grader scores keyword coverage. | |
| Reply with ONLY the JSON object. No prose, no markdown fences. | |
| """ | |
| ).strip() | |
| # ---------- Strict log format helpers ---------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| # Strip newlines to honour the one-line-per-step rule | |
| action_s = action.replace("\n", " ").replace("\r", " ") | |
| err_s = error_val.replace("\n", " ").replace("\r", " ") | |
| print( | |
| f"[STEP] step={step} action={action_s} reward={reward:.2f} done={done_val} error={err_s}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # ---------- Agent ---------- | |
| def build_user_prompt(step: int, obs_payload: dict, last_reward: float, queried: dict) -> str: | |
| subgoals = obs_payload.get("subgoals") or {} | |
| # Compute the next sub-goal to target | |
| order = ["acked", "scoped", "hypothesized", "mitigated", "written"] | |
| nxt = next((g for g in order if not subgoals.get(g, False)), "written") | |
| hint_map = { | |
| "acked": "Next: send {\"tool\": \"ack\", \"args\": {}}", | |
| "scoped": 'Next: gather minimal evidence then send {"tool": "scope", "args": {"services": [...]}}', | |
| "hypothesized": 'Next: send {"tool": "hypothesize", "args": {"root_cause": "..."}} with concrete keywords from logs', | |
| "mitigated": 'Next: send {"tool": "mitigate", "args": {"action": "..."}} naming a concrete remediation', | |
| "written": 'Next: send {"tool": "write_status", "args": {"text": "..."}} to end the episode with a concise status update', | |
| } | |
| return textwrap.dedent( | |
| f""" | |
| Step: {step} Steps remaining: {obs_payload.get('steps_remaining')} | |
| Task: {obs_payload.get('task_id')} | |
| Incident brief: {obs_payload.get('task_description')} | |
| Services available: {obs_payload.get('available_services')} | |
| Trace ids available: {obs_payload.get('available_trace_ids')} | |
| Sub-goals: {subgoals} | |
| Total reward so far: {obs_payload.get('reward_so_far'):.2f} | |
| Services already queried (logs): {sorted(queried.get('logs', set()))} | |
| Services already queried (metrics): {sorted(queried.get('metrics', set()))} | |
| Traces already queried: {sorted(queried.get('traces', set()))} | |
| Last tool result: | |
| {obs_payload.get('tool_result')} | |
| {hint_map[nxt]} | |
| Reply with a single JSON object on one line. | |
| """ | |
| ).strip() | |
| def ask_model(client: OpenAI, user_prompt: str) -> dict: | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Strip optional code fences | |
| if text.startswith("```"): | |
| text = text.strip("`") | |
| # drop a leading "json" language tag if present | |
| if text.lower().startswith("json"): | |
| text = text[4:].strip() | |
| # Find the first {...} block | |
| lb, rb = text.find("{"), text.rfind("}") | |
| if lb != -1 and rb != -1: | |
| text = text[lb : rb + 1] | |
| return json.loads(text) | |
| except Exception as exc: | |
| print(f"[DEBUG] Model call failed, defaulting to ack: {exc}", flush=True) | |
| return {"tool": "ack", "args": {}} | |
| def obs_to_payload(obs) -> dict: | |
| """Serialise a PostmortemObservation for prompt construction.""" | |
| return { | |
| "task_id": getattr(obs, "task_id", ""), | |
| "task_description": getattr(obs, "task_description", ""), | |
| "available_services": getattr(obs, "available_services", []), | |
| "available_trace_ids": getattr(obs, "available_trace_ids", []), | |
| "subgoals": getattr(obs, "subgoals", {}), | |
| "reward_so_far": getattr(obs, "reward_so_far", 0.0), | |
| "steps_remaining": getattr(obs, "steps_remaining", 0), | |
| "tool_result": getattr(obs, "tool_result", ""), | |
| } | |
| async def run_one_task(client: OpenAI, env: PostmortemEnv) -> None: | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| task_name = "unknown" | |
| try: | |
| result = await env.reset() | |
| obs = result.observation | |
| task_name = getattr(obs, "task_id", "unknown") or "unknown" | |
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) | |
| last_reward = 0.0 | |
| done = result.done | |
| queried = {"logs": set(), "metrics": set(), "traces": set()} | |
| for step in range(1, MAX_STEPS + 1): | |
| if done: | |
| break | |
| payload = obs_to_payload(obs) | |
| user_prompt = build_user_prompt(step, payload, last_reward, queried) | |
| tool_call = ask_model(client, user_prompt) | |
| tool = str(tool_call.get("tool", "ack")) | |
| args = tool_call.get("args", {}) or {} | |
| if not isinstance(args, dict): | |
| args = {} | |
| # Track queries so the prompt can discourage repeats | |
| if tool == "query_logs": | |
| queried["logs"].add(args.get("service", "")) | |
| elif tool == "query_metrics": | |
| queried["metrics"].add(args.get("service", "")) | |
| elif tool == "query_traces": | |
| queried["traces"].add(args.get("trace_id", "")) | |
| action = PostmortemAction(tool=tool, args=args) | |
| result = await env.step(action) | |
| obs = result.observation | |
| reward = float(result.reward or 0.0) | |
| done = bool(result.done) | |
| error = getattr(obs, "last_error", "") or None | |
| action_str = f"{tool}:{json.dumps(args, separators=(',', ':'))}" | |
| rewards.append(reward) | |
| steps_taken = step | |
| last_reward = reward | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=error) | |
| if done: | |
| break | |
| score = float(getattr(obs, "reward_so_far", 0.0) or sum(rewards)) | |
| score = min(max(score, 0.0), 1.0) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| async def main() -> None: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| # Spin up one containerised env; run the 3 scenarios back-to-back | |
| # by issuing 3 resets (the env rotates scenarios on each reset). | |
| image = LOCAL_IMAGE_NAME or _DEFAULT_IMAGE_FALLBACK | |
| env = await PostmortemEnv.from_docker_image(image) | |
| try: | |
| for _ in range(3): | |
| await run_one_task(client, env) | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as exc: | |
| print(f"[DEBUG] env.close() error: {exc}", flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |