Spaces:
Running
Running
| """Core environment implementation for Code Review OpenEnv.""" | |
| from __future__ import annotations | |
| from typing import Dict, List, Optional, Tuple | |
| from env.models import CodeReviewAction, CodeReviewObservation, ReviewComment | |
| from env.reward_engine import RewardEngine | |
| from env.state_manager import StateManager | |
| from env.tasks.task_easy import get_task as get_easy | |
| from env.tasks.task_hard import get_task as get_hard | |
| from env.tasks.task_medium import get_task as get_medium | |
| class CodeReviewEnv: | |
| """Gym-like environment for evaluating code-review agents.""" | |
| def __init__(self) -> None: | |
| """Initialize environment with no active episode.""" | |
| self._task_id: str | None = None | |
| self._max_steps: int = 0 | |
| self._pr_title: str = "" | |
| self._pr_description: str = "" | |
| self._full_file: str = "" | |
| self._code_diff: str = "" | |
| self._ground_truth = [] | |
| self._state: StateManager | None = None | |
| self._reward_engine: RewardEngine | None = None | |
| # Multi-file repository support | |
| self._repository_files: Optional[Dict[str, str]] = None | |
| self._available_files: Optional[List[str]] = None | |
| def reset(self, task_id: str) -> CodeReviewObservation: | |
| """Reset the environment to a fresh episode for the given task. | |
| Args: | |
| task_id: One of "easy", "medium", "hard". | |
| Returns: | |
| Initial observation with empty existing_comments. | |
| """ | |
| if task_id == "easy": | |
| task = get_easy() | |
| elif task_id == "medium": | |
| task = get_medium() | |
| elif task_id == "hard": | |
| task = get_hard() | |
| else: | |
| raise ValueError(f"Unknown task_id: {task_id}") | |
| self._task_id = task.task_id | |
| self._max_steps = task.max_steps | |
| self._pr_title = task.pr_title | |
| self._pr_description = task.pr_description | |
| self._full_file = task.full_file | |
| self._code_diff = task.code_diff | |
| self._ground_truth = task.ground_truth | |
| # Store repository files if available | |
| self._repository_files = getattr(task, 'repository_files', None) | |
| self._available_files = getattr(task, 'available_files', None) | |
| self._state = StateManager(task_id=task.task_id) | |
| self._reward_engine = RewardEngine(task_id=task.task_id, ground_truth=task.ground_truth, max_steps=task.max_steps) | |
| return CodeReviewObservation( | |
| task_id=task.task_id, | |
| language="python", | |
| pr_title=self._pr_title, | |
| pr_description=self._pr_description, | |
| code_diff=self._code_diff, | |
| full_file=self._full_file, | |
| existing_comments=[], | |
| step_number=1, | |
| max_steps=self._max_steps, | |
| review_status="pending", | |
| repository_files=self._repository_files, | |
| available_files=self._available_files, | |
| ) | |
| def step(self, action: CodeReviewAction) -> Tuple[CodeReviewObservation, float, bool, dict]: | |
| """Apply an action and advance the environment by one step. | |
| Args: | |
| action: CodeReviewAction describing the agent's operation. | |
| Returns: | |
| Tuple of (updated_observation, reward, done, info). | |
| """ | |
| if self._state is None or self._reward_engine is None or self._task_id is None: | |
| raise RuntimeError("Environment must be reset() before step().") | |
| error: str | None = None | |
| reward: float | |
| new_comment: ReviewComment | None = None | |
| # Handle inspect_file action | |
| if action.operation == "inspect_file": | |
| if self._repository_files and action.filename and action.filename in self._repository_files: | |
| outcome = self._reward_engine.compute( | |
| action, | |
| comments_so_far=self._state.comments, | |
| correctly_identified_bug_lines=self._state.correctly_identified_bug_lines, | |
| step_number=self._state.step_number, | |
| steps_used_after_this=self._state.step_number, | |
| ) | |
| reward = outcome.reward | |
| self._state.record_action(action, reward, error=None) | |
| else: | |
| reward = 0.0 | |
| error = f"File not found: {action.filename}" | |
| self._state.record_action(action, reward, error=error) | |
| # Handle inspect_lines action | |
| elif action.operation == "inspect_lines": | |
| if action.start_line is not None and action.end_line is not None: | |
| if action.end_line - action.start_line > 40: | |
| reward = 0.0 | |
| error = "inspect_lines max range is 40 lines" | |
| self._state.record_action(action, reward, error=error) | |
| elif self._repository_files and action.filename and action.filename in self._repository_files: | |
| outcome = self._reward_engine.compute( | |
| action, | |
| comments_so_far=self._state.comments, | |
| correctly_identified_bug_lines=self._state.correctly_identified_bug_lines, | |
| step_number=self._state.step_number, | |
| steps_used_after_this=self._state.step_number, | |
| ) | |
| reward = outcome.reward | |
| self._state.record_action(action, reward, error=None) | |
| else: | |
| reward = 0.0 | |
| error = f"File not found: {action.filename}" | |
| self._state.record_action(action, reward, error=error) | |
| else: | |
| reward = 0.0 | |
| error = "inspect_lines requires start_line and end_line" | |
| self._state.record_action(action, reward, error=error) | |
| elif action.operation == "add_comment": | |
| if action.line_number is None: | |
| outcome = self._reward_engine.compute( | |
| action, | |
| comments_so_far=self._state.comments, | |
| correctly_identified_bug_lines=self._state.correctly_identified_bug_lines, | |
| step_number=self._state.step_number, | |
| steps_used_after_this=self._state.step_number, | |
| ) | |
| reward = outcome.reward | |
| error = "Missing line_number for add_comment" | |
| self._state.record_action( | |
| action, | |
| reward, | |
| new_comment=None, | |
| correctly_identified_bug_line=None, | |
| is_false_positive=True, | |
| is_red_herring_flag=False, | |
| error=error, | |
| confidence_modifier=outcome.confidence_modifier, | |
| ) | |
| else: | |
| new_comment = ReviewComment( | |
| line_number=action.line_number, | |
| severity=action.severity or "minor", | |
| category=action.category or "bug", | |
| message=action.message or "Issue detected", | |
| step_added=self._state.step_number, | |
| ) | |
| outcome = self._reward_engine.compute( | |
| action, | |
| comments_so_far=self._state.comments + [new_comment], | |
| correctly_identified_bug_lines=self._state.correctly_identified_bug_lines, | |
| step_number=self._state.step_number, | |
| steps_used_after_this=self._state.step_number, | |
| ) | |
| reward = outcome.reward | |
| self._state.record_action( | |
| action, | |
| reward, | |
| new_comment=new_comment, | |
| correctly_identified_bug_line=outcome.correctly_identified_bug_line, | |
| is_false_positive=outcome.is_false_positive, | |
| is_red_herring_flag=outcome.is_red_herring_flag, | |
| error=None, | |
| confidence_modifier=outcome.confidence_modifier, | |
| explanation_depth=outcome.explanation_depth, | |
| ) | |
| else: | |
| outcome = self._reward_engine.compute( | |
| action, | |
| comments_so_far=self._state.comments, | |
| correctly_identified_bug_lines=self._state.correctly_identified_bug_lines, | |
| step_number=self._state.step_number, | |
| steps_used_after_this=self._state.step_number, | |
| ) | |
| reward = outcome.reward | |
| self._state.record_action(action, reward, error=None) | |
| done = False | |
| if action.operation in {"done", "approve", "request_changes"}: | |
| done = True | |
| if self._state.step_number > self._max_steps: | |
| done = True | |
| if action.operation != "done": | |
| self._state.cumulative_reward += -0.20 | |
| # On terminal actions, replace cumulative_reward with the final F1 | |
| # grader score. The per-step shaped rewards guided the agent during | |
| # the episode, but the final reported score must be the deterministic | |
| # F1 result — NOT the sum of shaped rewards + F1. | |
| if done and action.operation in {"done", "approve", "request_changes"}: | |
| final_f1 = outcome.final_score if hasattr(outcome, "final_score") and outcome.final_score is not None else None | |
| if final_f1 is not None: | |
| # Replace, don't add — this is the fix for the 0.999 ceiling bug. | |
| self._state.cumulative_reward = final_f1 | |
| else: | |
| # approve/request_changes or done without final_score — run grader. | |
| final_f1 = self._reward_engine._grade(self._state.comments) | |
| self._state.cumulative_reward = final_f1 | |
| # Compute injection resistance at episode end for hard task | |
| if done and self._task_id == "hard": | |
| # The injected lines are the real bug lines that have adversarial comments above them | |
| # ECB bug (line 35) and race condition bug (line 47) | |
| injected_lines = [35, 47] | |
| self._state.compute_injection_resistance(self._ground_truth, injected_lines) | |
| # Clamp cumulative score to (0.0, 1.0) per OpenEnv strictly between bounds spec. | |
| clamped_score = max(0.001, min(0.999, self._state.cumulative_reward)) | |
| info = { | |
| "bugs_found": len(self._state.correctly_identified_bug_lines), | |
| "false_positives": self._state.get_false_positive_count(), | |
| "current_score": clamped_score, | |
| "error": error, | |
| } | |
| obs = CodeReviewObservation( | |
| task_id=self._task_id, | |
| language="python", | |
| pr_title=self._pr_title, | |
| pr_description=self._pr_description, | |
| code_diff=self._code_diff, | |
| full_file=self._full_file, | |
| existing_comments=list(self._state.comments), | |
| step_number=max(1, self._state.step_number), | |
| max_steps=self._max_steps, | |
| review_status="submitted" if done else "in_review", | |
| repository_files=self._repository_files, | |
| available_files=self._available_files, | |
| ) | |
| return obs, float(round(min(max(reward, 0.01), 0.99), 3)), bool(done), info | |
| def state(self) -> dict: | |
| """Return full current state as a plain dict.""" | |
| if self._state is None: | |
| return {"task_id": None, "step_number": 0, "comments": [], "running_score": 0.01, "bugs_found": 0, "false_positives": 0} | |
| return self._state.to_dict() | |