Spaces:
Running
Running
| """ | |
| inference.py TraceRL OpenEnv Baseline Agent | |
| ============================================== | |
| MetaEnv Round 1 competition-compliant inference script. | |
| MANDATORY ENV VARS (injected by grading server): | |
| API_BASE_URL The LLM API endpoint (OpenAI-compatible) | |
| MODEL_NAME The model identifier for inference | |
| HF_TOKEN Hugging Face / API key | |
| STDOUT FORMAT (machine-parsed by automated validator): | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> rewards=<r1,r2,...,rn> | |
| RULES: | |
| - reward / rewards always formatted to exactly 2 decimal places (f"{reward:.2f}") | |
| - done / success always lowercase boolean strings: true / false | |
| - error is raw last_action_error string, or null if none | |
| - One [START], one [STEP] per env.step(), one [END] always even on exception | |
| """ | |
| import os | |
| import sys | |
| import textwrap | |
| import difflib | |
| import re | |
| import time | |
| from typing import List, Optional | |
| from openai import OpenAI | |
| import unidiff | |
| from client import CodeFixerEnv | |
| from models import CodeFixerAction | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def _enable_hermetic_runtime() -> None: | |
| """ | |
| Keep this process isolated from parent/global Python environment leakage. | |
| """ | |
| if os.getenv("HERMETIC_RUN", "1").strip().lower() not in {"1", "true", "yes"}: | |
| return | |
| os.environ["PYTHONPATH"] = "" | |
| os.environ["PYTHONNOUSERSITE"] = "1" | |
| os.environ["PYTHONDONTWRITEBYTECODE"] = "1" | |
| os.environ.pop("PYTHONHOME", None) | |
| _enable_hermetic_runtime() | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://integrate.api.nvidia.com/v1") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "qwen/qwen2.5-coder-32b-instruct") | |
| MAX_STEPS = int(os.getenv("MAX_STEPS", "10")) | |
| TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7")) | |
| MAX_TOKENS = int(os.getenv("MAX_TOKENS", "512")) | |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_THRESHOLD", "0.5")) | |
| MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) | |
| _DIFFICULTIES = ["easy", "medium", "hard"] | |
| TASK_NAME = (os.getenv("TRACERL_TASK") or "").strip().lower() | |
| BENCHMARK = os.getenv("TRACERL_BENCHMARK", "rl-code-fix-env") | |
| ENV_URL = os.getenv("ENV_URL", "http://localhost:8000") | |
| if "TASK_SOURCE" not in os.environ: | |
| os.environ["TASK_SOURCE"] = "swebench" | |
| if TASK_NAME and TASK_NAME not in _DIFFICULTIES: | |
| print( | |
| f"[DEBUG] Invalid TRACERL_TASK='{TASK_NAME}'. Falling back to all tasks.", | |
| flush=True, file=sys.stderr, | |
| ) | |
| TASK_NAME = "" | |
| llm = OpenAI(api_key=API_KEY, base_url=API_BASE_URL) | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are an autonomous bug-fixing agent. You will be given a Python function or module that contains one or more bugs. | |
| Your task is to produce corrected Python code that fixes all bugs. | |
| Output format — you MUST follow this exactly: | |
| - Return ONLY the corrected code, nothing else. | |
| - Do NOT include markdown fences (```), prose, or explanations. | |
| - If the code has no bugs, output exactly: NO_CHANGE | |
| Code rules: | |
| - Preserve the original function signature exactly. | |
| - Do not add new imports unless strictly required by the fix. | |
| - Do not add print statements, debug code, or comments that were not in the original. | |
| - Apply the minimal fix needed. Do not refactor, rename, or reformat unrelated lines. | |
| Think step-by-step internally, but your final output must be ONLY corrected code or NO_CHANGE. | |
| """).strip() | |
| def _build_user_prompt(observation: dict, history: List[str], step: int) -> str: | |
| """Build the per-step prompt from the current environment observation.""" | |
| code = observation.get("code", "(no code provided)") | |
| test_score = observation.get("test_score", 0.0) | |
| total_tests = observation.get("total_tests", 1) | |
| error_logs = observation.get("logs", "") | |
| test_status = "PASSING" if test_score >= 1.0 else "FAILING" | |
| parts = [ | |
| f"Step: {step}", | |
| f"Tests Status: {test_status} (score {test_score:.2f}/{total_tests})", | |
| "", | |
| "=== Current Code ===", | |
| code, | |
| "", | |
| ] | |
| if error_logs: | |
| parts += ["=== Error Logs ===", error_logs, ""] | |
| # FIX: include actual previous patch attempts so LLM learns from history | |
| if history: | |
| parts += ["=== Previous Attempts (last 3) ==="] | |
| parts += history[-3:] | |
| parts += [""] | |
| parts += [ | |
| "Your task: provide the corrected full code for the snippet above.", | |
| "Return ONLY corrected code (or NO_CHANGE), no explanations or markdown.", | |
| ] | |
| return "\n".join(parts) | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step( | |
| step: int, | |
| action: str, | |
| reward: float, | |
| done: bool, | |
| error: Optional[str], | |
| ) -> None: | |
| done_str = "true" if done else "false" | |
| error_str = error if error else "null" | |
| action_str = action.replace("\n", "\\n").replace("\r", "") | |
| print( | |
| f"[STEP] step={step} action={action_str} " | |
| f"reward={reward:.2f} done={done_str} error={error_str}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| success_str = "true" if success else "false" | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={success_str} steps={steps} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| _env_client: Optional[CodeFixerEnv] = None | |
| def _get_client() -> CodeFixerEnv: | |
| global _env_client | |
| if _env_client is None: | |
| _env_client = CodeFixerEnv(base_url=ENV_URL) | |
| return _env_client | |
| def env_reset() -> dict: | |
| result = _get_client().reset() | |
| obs = result.observation | |
| return { | |
| "observation": { | |
| "code": obs.code, | |
| "logs": obs.logs, | |
| "test_score": obs.test_score, | |
| "total_tests": obs.total_tests, | |
| "steps": obs.steps, | |
| }, | |
| "done": getattr(obs, "done", False), | |
| } | |
| def env_step(action_type: str, payload: str = "") -> dict: | |
| try: | |
| result = _get_client().step(CodeFixerAction(type=action_type, payload=payload)) | |
| obs = result.observation | |
| return { | |
| "observation": { | |
| "code": obs.code, | |
| "logs": obs.logs, | |
| "test_score": obs.test_score, | |
| "total_tests": obs.total_tests, | |
| "steps": obs.steps, | |
| }, | |
| "reward": result.reward, | |
| "done": result.done, | |
| "error": None, | |
| } | |
| except Exception as exc: | |
| return { | |
| "observation": { | |
| "code": "", | |
| "logs": "", | |
| "test_score": 0.0, | |
| "total_tests": 1, | |
| "steps": 0, | |
| }, | |
| "reward": 0.0, | |
| "done": False, | |
| "error": str(exc), | |
| } | |
| def _strip_markdown_fences(text: str) -> str: | |
| cleaned = (text or "").strip() | |
| # Remove opening fence with optional language | |
| if cleaned.startswith("```"): | |
| cleaned = re.sub(r"^```[a-zA-Z0-9_-]*\s*\n", "", cleaned) | |
| cleaned = re.sub(r"\n```$", "", cleaned.strip()) | |
| return cleaned.strip() | |
| def _looks_like_unified_diff(text: str) -> bool: | |
| return ("--- " in text and "+++ " in text and "@@ " in text) | |
| def _is_valid_unified_diff(text: str) -> bool: | |
| try: | |
| unidiff.PatchSet(text) | |
| return True | |
| except Exception: | |
| return False | |
| def _build_unified_diff(original: str, revised: str) -> str: | |
| lines = difflib.unified_diff( | |
| original.splitlines(), | |
| revised.splitlines(), | |
| fromfile="a/code.py", | |
| tofile="b/code.py", | |
| lineterm="", | |
| ) | |
| return "\n".join(lines).strip() | |
| def _looks_like_python_code(text: str) -> bool: | |
| """Check if text looks like Python code (not a diff, not prose).""" | |
| text_lower = text.lower().strip() | |
| # Not a diff | |
| if _looks_like_unified_diff(text): | |
| return False | |
| # Not just "no change" or similar | |
| if text_lower in ("no_change", "no change", "no changes", "no changes made", | |
| "the code is correct", "already correct", "no bugs found", | |
| "no bug", "no bugs", "already fixed"): | |
| return False | |
| # Check for Python-like patterns (def, class, import, etc.) | |
| python_indicators = ['def ', 'class ', 'import ', 'from ', 'return ', | |
| 'if __name__', 'async def', 'self.', 'print('] | |
| return any(indicator in text for indicator in python_indicators) | |
| def _looks_like_output_value(text: str) -> bool: | |
| """Check if text is just an output value like [[3,1],[4,2]]""" | |
| text = text.strip() | |
| # Looks like a literal value/expression, not code | |
| if text.startswith('[') and text.endswith(']') and '\n' not in text: | |
| # Could be a list/dict literal | |
| if ',' in text or ':' in text: | |
| return True | |
| # Single word or simple expression | |
| if '\n' not in text and len(text.split()) <= 3: | |
| # Check if it's not a function definition | |
| if 'def ' not in text and 'class ' not in text: | |
| return True | |
| return False | |
| def _normalize_action(raw_action: str, original_code: str) -> str: | |
| """Normalize LLM output to unified diff format.""" | |
| cleaned = _strip_markdown_fences(raw_action) | |
| # Handle empty or NO_CHANGE cases | |
| if not cleaned: | |
| return "" | |
| cleaned_lower = cleaned.lower().strip() | |
| if cleaned_lower == "no_change": | |
| return "" | |
| # Check for common "no change" phrases | |
| no_change_phrases = [ | |
| "no change", "no changes", "no changes made", | |
| "the code is correct", "already correct", "no bugs found", | |
| "no bug", "no bugs", "already fixed", "no modifications needed", | |
| "the provided code is already correct", "code appears to be correct" | |
| ] | |
| if any(phrase in cleaned_lower for phrase in no_change_phrases): | |
| return "" | |
| # Check if output looks like a raw value (not code) - reject it | |
| if _looks_like_output_value(cleaned): | |
| print(f"[DEBUG] Rejected output that looks like a value: {cleaned[:50]}...", | |
| flush=True, file=sys.stderr) | |
| return "" | |
| # Check if it looks like a unified diff already | |
| if _looks_like_unified_diff(cleaned): | |
| return cleaned if _is_valid_unified_diff(cleaned) else "" | |
| # Check if it looks like Python code - convert to diff | |
| if _looks_like_python_code(cleaned): | |
| generated = _build_unified_diff(original_code, cleaned) | |
| return generated if _is_valid_unified_diff(generated) else "" | |
| # If nothing matches, return empty (invalid output) | |
| print(f"[DEBUG] Could not parse LLM output as code or diff: {cleaned[:100]}...", | |
| flush=True, file=sys.stderr) | |
| return "" | |
| def get_action(observation: dict, history: List[str], step: int) -> str: | |
| """ | |
| Call the LLM to produce the next action. | |
| Returns a unified diff patch string or empty string on failure. | |
| """ | |
| user_prompt = _build_user_prompt(observation, history, step) | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| completion = llm.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| raw_action = (completion.choices[0].message.content or "").strip() | |
| return _normalize_action(raw_action, observation.get("code", "")) | |
| except Exception as exc: | |
| print( | |
| f"[DEBUG] LLM call failed at step {step} (attempt {attempt+1}/{MAX_RETRIES}): {exc}", | |
| flush=True, file=sys.stderr, | |
| ) | |
| if "429" in str(exc) and attempt < MAX_RETRIES - 1: | |
| # FIX: exponential backoff with longer base delay for 429s | |
| sleep_time = 5 * (2 ** attempt) | |
| print( | |
| f"[DEBUG] Rate limited. Sleeping {sleep_time}s before retry.", | |
| flush=True, file=sys.stderr, | |
| ) | |
| time.sleep(sleep_time) | |
| continue | |
| if attempt == MAX_RETRIES - 1: | |
| return "" | |
| return "" | |
| def main() -> None: | |
| forced_task = os.getenv("TRACERL_TASK") | |
| task_plan = [TASK_NAME] if forced_task else _DIFFICULTIES[:] | |
| try: | |
| for planned_task in task_plan: | |
| rewards: List[float] = [] | |
| history: List[str] = [] | |
| steps_taken: int = 0 | |
| success: bool = False | |
| score: float = 0.0 | |
| final_test_score: float = 0.0 | |
| episode_start = time.perf_counter() | |
| log_start(task=planned_task, env=BENCHMARK, model=MODEL_NAME) | |
| print(f"[DEBUG] planned_task={planned_task}", flush=True, file=sys.stderr) | |
| try: | |
| reset_resp = env_reset() | |
| observation = reset_resp.get("observation", {}) | |
| done = reset_resp.get("done", False) | |
| for step in range(1, MAX_STEPS + 1): | |
| step_start = time.perf_counter() | |
| if done: | |
| break | |
| # Get patch from LLM | |
| action = get_action(observation, history, step) | |
| # Step 1: apply the patch | |
| patch_resp = env_step("apply_patch", action) | |
| patch_obs = patch_resp.get("observation", {}) | |
| patch_error = patch_resp.get("error") or None | |
| if not patch_error and patch_obs.get("code"): | |
| observation = patch_obs | |
| # Step 2: always run tests after patching to get real reward signal | |
| test_resp = env_step("run_tests", "") | |
| next_obs = test_resp.get("observation", {}) | |
| reward = float(test_resp.get("reward", 0.0)) | |
| done = bool(test_resp.get("done", False)) | |
| test_error = test_resp.get("error") or None | |
| error = patch_error or test_error | |
| if not test_error and next_obs.get("code"): | |
| observation = next_obs | |
| test_score = float(next_obs.get("test_score", 0.0)) | |
| final_test_score = test_score | |
| total_tests = int(next_obs.get("total_tests", 1)) | |
| test_status = "PASS" if test_score >= 1.0 else "FAIL" | |
| raw_logs = (next_obs.get("logs") or "").strip() | |
| log_preview = raw_logs.replace("\n", " | ") | |
| if len(log_preview) > 300: | |
| log_preview = log_preview[:300] + "...(truncated)" | |
| print( | |
| f"[DEBUG] tests status={test_status} score={test_score:.2f}/{total_tests}", | |
| flush=True, file=sys.stderr, | |
| ) | |
| if log_preview: | |
| print(f"[DEBUG] test_logs={log_preview}", flush=True, file=sys.stderr) | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step(step=step, action=action, reward=reward, done=done, error=error) | |
| print( | |
| f"[DEBUG] step={step} elapsed_s={time.perf_counter() - step_start:.3f}", | |
| flush=True, file=sys.stderr, | |
| ) | |
| # FIX: store actual patch in history so LLM sees what it already tried | |
| history.append( | |
| f"Step {step}: patch={'(none)' if not action else action[:120]} | " | |
| f"test={test_status} reward={reward:.2f}" | |
| ) | |
| if done: | |
| success = final_test_score >= SUCCESS_SCORE_THRESHOLD | |
| break | |
| else: | |
| success = False | |
| score = min(max(final_test_score, 0.0), 1.0) | |
| if not done: | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| except Exception as exc: | |
| print(f"[DEBUG] Episode crashed: {exc}", flush=True, file=sys.stderr) | |
| success = False | |
| score = min(max(final_test_score, 0.0), 1.0) | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| print( | |
| f"[DEBUG] total_elapsed_s={time.perf_counter() - episode_start:.3f}", | |
| flush=True, file=sys.stderr, | |
| ) | |
| finally: | |
| if _env_client is not None: | |
| try: | |
| _env_client.close() | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| main() | |