Spaces:
Sleeping
Sleeping
| """ | |
| Inference Script — CI/CD Doctor | |
| =================================== | |
| MANDATORY environment variables: | |
| API_BASE_URL LLM API endpoint (default: HuggingFace router) | |
| MODEL_NAME Model identifier (default: Qwen2.5-72B-Instruct) | |
| HF_TOKEN / API_KEY API key | |
| IMAGE_NAME Docker image name (if using from_docker_image()) | |
| CICD_TASK Task difficulty: easy | medium | hard (default: easy) | |
| STDOUT FORMAT | |
| [START] task=<task> env=CI_CD_Doctor model=<model> | |
| [STEP] step=<n> action=<cmd> pipeline_status=<status> reward=<0.00> done=<true|false> stdout=<preview> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<0.00> rewards=<r1,r2,...> | |
| """ | |
| import os | |
| import asyncio | |
| import textwrap | |
| from typing import List, Optional | |
| from dotenv import load_dotenv | |
| import random | |
| from openai import OpenAI | |
| from client import CiCdDoctorAction, CiCdDoctorEnv | |
| load_dotenv() | |
| IMAGE_NAME = os.getenv("IMAGE_NAME") | |
| API_KEY = os.getenv("HF_TOKEN") | |
| if API_KEY is None: | |
| raise ValueError("API_KEY environment variable is required") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| TASK_NAME = os.getenv("CICD_TASK", "easy") | |
| BENCHMARK = "CI_CD_Doctor" | |
| MAX_STEPS_BY_TASK = {"easy": 10, "medium": 15, "hard": 25} | |
| DEFAULT_MAX_STEPS = 15 | |
| BASE_SEED = 42 | |
| random.seed(BASE_SEED) | |
| EPISODES = [ | |
| {"task": t, "seed": random.randint(0, 1000)} | |
| for t in ["easy", "medium", "medium", "medium", "hard"] | |
| ] | |
| TEMPERATURE = 0.5 | |
| MAX_TOKENS = 300 | |
| SUCCESS_SCORE_THRESHOLD = 0.8 | |
| SUCCESS_THRESHOLDS = {"easy": 0.70, "medium": 0.60, "hard": 0.45} | |
| MIN_REWARD = 0.01 | |
| def clamp(value: float, lo: float = 0.01, hi: float = 0.99) -> float: | |
| """Clamp value to [lo, hi] inclusive.""" | |
| return max(lo, min(hi, value)) | |
| def _sanitize(text: str) -> str: | |
| return " ".join((text or "").split()) | |
| def _format_stdout_for_log(text: str, max_len: int = 500) -> str: | |
| if not text: | |
| return '""' | |
| escaped = text.replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r") | |
| if len(escaped) > max_len: | |
| escaped = escaped[:max_len] + "..." | |
| return f'"{escaped}"' | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are a DevOps engineer debugging a broken CI/CD pipeline. | |
| Your goal: make the pipeline pass in as few steps as possible. | |
| Available commands (output EXACTLY ONE per turn, nothing else): | |
| cat <filename> — read a file | |
| echo "<text>" >> <filename> — append a line to a file | |
| sed -i 's/old/new/' <filename> — find-replace in a file (replaces ALL occurrences) | |
| pipeline run — run the full pipeline | |
| pipeline logs — show last pipeline logs | |
| pipeline status — show current pass/fail status | |
| diagnose "<description>" — record your theory about the bug (max 2 per episode) | |
| STRATEGY: | |
| - Errors describe SYMPTOMS, not solutions. You may need to read multiple | |
| files to understand the root cause. | |
| - Some bugs manifest in one stage but originate in a different file. | |
| Trace the error to its source. | |
| - Build configuration lives in files like Makefile, Dockerfile, ci.yml, | |
| deploy_config.yml, and service.yaml. When an error says "check your | |
| build configuration", READ these files. | |
| - Editing a file you haven't read will cost you points. Always read first. | |
| - Using 'diagnose' before editing earns bonus points for correct reasoning. | |
| - Understand the ROOT CAUSE before editing. A wrong edit wastes two steps | |
| (the bad edit + undoing it). Think first, act precisely. | |
| - Fix ALL issues you can identify before re-running the pipeline. | |
| - If a fix didn't work, re-read the file to see what actually changed. | |
| SCORING: | |
| - You are scored on PROGRESS, not activity. | |
| - Reading files you already read or re-running the pipeline without changes | |
| will LOWER your score. | |
| - Each step that doesn't advance your understanding or fix the problem costs | |
| you points. Be deliberate. | |
| Output ONLY the raw command string. No explanation, no markdown, no backticks. | |
| """ | |
| ).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step( | |
| step: int, | |
| action: str, | |
| reward: float, | |
| done: bool, | |
| error: Optional[str], | |
| stdout: Optional[str], | |
| ) -> None: | |
| error_val = _sanitize(error) if error else "null" | |
| done_val = str(done).lower() | |
| stdout_val = _format_stdout_for_log(stdout) | |
| print( | |
| f"[STEP] step={step} action={_sanitize(action)} reward={reward:.2f} done={done_val} stdout={stdout_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| def build_user_prompt( | |
| step: int, | |
| last_stdout: str, | |
| pipeline_status: str, | |
| history: List[str], | |
| errors_seen: List[str], | |
| files_read: dict, | |
| edits_made: List[str], | |
| max_steps: int = DEFAULT_MAX_STEPS, | |
| ) -> str: | |
| history_block = "\n".join(history) if history else "None" | |
| errors_block = "\n".join(f" - {e}" for e in errors_seen) if errors_seen else " (none yet)" | |
| files_block = "" | |
| if files_read: | |
| for fname, content in files_read.items(): | |
| preview = content.strip()[:300] | |
| files_block += f"\n [{fname}]:\n {preview}\n" | |
| else: | |
| files_block = " (none yet)" | |
| edits_block = "\n".join(f" - {e}" for e in edits_made) if edits_made else " (none yet)" | |
| return textwrap.dedent( | |
| f""" | |
| Step {step} of {max_steps} | Pipeline: {pipeline_status} | |
| Last command output: | |
| {last_stdout.strip() or "(no output)"} | |
| === SESSION CONTEXT === | |
| Errors seen so far: | |
| {errors_block} | |
| Files read (latest content): | |
| {files_block} | |
| Edits applied so far: | |
| {edits_block} | |
| Full action history: | |
| {history_block} | |
| What is your next command? | |
| """ | |
| ).strip() | |
| def get_agent_command( | |
| client: OpenAI, | |
| step: int, | |
| last_stdout: str, | |
| pipeline_status: str, | |
| history: List[str], | |
| errors_seen: List[str], | |
| files_read: dict, | |
| edits_made: List[str], | |
| max_steps: int = DEFAULT_MAX_STEPS, | |
| ) -> str: | |
| user_prompt = build_user_prompt( | |
| step, last_stdout, pipeline_status, history, | |
| errors_seen, files_read, edits_made, max_steps=max_steps, | |
| ) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| return text if text else "pipeline status" | |
| except Exception as exc: | |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) | |
| return "pipeline status" | |
| def _extract_errors(stdout: str) -> List[str]: | |
| """Pull error lines out of pipeline output.""" | |
| errors = [] | |
| for line in stdout.splitlines(): | |
| line = line.strip() | |
| if line.upper().startswith("ERROR") or "error" in line.lower(): | |
| if line and line not in errors: | |
| errors.append(line) | |
| return errors | |
| async def run_episode(client: OpenAI, env, task: str, seed: int) -> dict: | |
| """Run a single episode. Returns {"score", "steps", "rewards", "success"}.""" | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| errors_seen: List[str] = [] | |
| files_read: dict = {} | |
| edits_made: List[str] = [] | |
| steps_taken = 0 | |
| max_steps = MAX_STEPS_BY_TASK.get(task, DEFAULT_MAX_STEPS) | |
| result = await env.reset(task=task, seed=seed) | |
| last_stdout = result.observation.stdout | |
| pipeline_status = result.observation.pipeline_status | |
| for step in range(1, max_steps + 1): | |
| if result.done: | |
| break | |
| command = get_agent_command( | |
| client, step, last_stdout, pipeline_status, history, | |
| errors_seen, files_read, edits_made, max_steps=max_steps, | |
| ) | |
| error: Optional[str] = None | |
| step_stdout = "" | |
| done = False | |
| reward = MIN_REWARD | |
| try: | |
| result = await env.step(CiCdDoctorAction(command=command)) | |
| obs = result.observation | |
| reward = result.reward | |
| done = result.done | |
| last_stdout = obs.stdout | |
| step_stdout = obs.stdout | |
| pipeline_status = obs.pipeline_status | |
| cmd_lower = command.strip() | |
| if cmd_lower.startswith("pipeline run"): | |
| for err in _extract_errors(step_stdout): | |
| if err not in errors_seen: | |
| errors_seen.append(err) | |
| elif cmd_lower.startswith("cat "): | |
| fname = cmd_lower[4:].strip() | |
| files_read[fname] = step_stdout | |
| elif cmd_lower.startswith("echo ") or cmd_lower.startswith("sed "): | |
| edits_made.append(f"Step {step}: {command}") | |
| except Exception as exc: | |
| reward = 0.01 | |
| done = True | |
| error = f"{type(exc).__name__}: {exc}" | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step( | |
| step=step, | |
| action=command, | |
| reward=reward, | |
| done=done, | |
| error=error, | |
| stdout=step_stdout, | |
| ) | |
| history.append( | |
| f"Step {step}: {command!r} -> status={pipeline_status} reward={reward:+.2f}" | |
| ) | |
| if done: | |
| break | |
| score = clamp(sum(rewards) if rewards else MIN_REWARD) | |
| threshold = SUCCESS_THRESHOLDS.get(task, SUCCESS_SCORE_THRESHOLD) | |
| return {"score": score, "steps": steps_taken, "rewards": rewards, "success": score >= threshold} | |
| async def main() -> None: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| env = None | |
| try: | |
| if IMAGE_NAME: | |
| env = await CiCdDoctorEnv.from_docker_image(IMAGE_NAME) | |
| else: | |
| base_url = os.getenv("ENV_BASE_URL", "http://localhost:8000") | |
| env = CiCdDoctorEnv(base_url=base_url) | |
| all_rewards: List[float] = [] | |
| all_steps = 0 | |
| for ep in EPISODES: | |
| task, seed = ep["task"], ep["seed"] | |
| log_start(task=task, env=BENCHMARK, model=MODEL_NAME) | |
| result = await run_episode(client, env, task=task, seed=seed) | |
| all_rewards.extend(result["rewards"]) | |
| all_steps += result["steps"] | |
| log_end( | |
| success=result["success"], | |
| steps=result["steps"], | |
| score=result["score"], | |
| rewards=result["rewards"], | |
| ) | |
| except Exception as exc: | |
| print(f"[DEBUG] fatal error: {type(exc).__name__}: {exc}", flush=True) | |
| finally: | |
| if env is not None: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| print(f"[DEBUG] env.close() error: {e}", flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |