Spaces:
Build error
Build error
| """Safe OpenEnv environment for deterministic Python code repair tasks.""" | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| from compat import Environment | |
| from graders import grade_task | |
| from models import ( | |
| HealthResponse, | |
| HistoryEntry, | |
| PythonCodeReviewAction, | |
| PythonCodeReviewObservation, | |
| PythonCodeReviewState, | |
| RewardDetails, | |
| TaskGrade, | |
| ) | |
| from tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids | |
| INVALID_ACTION_PENALTY = 0.10 | |
| NO_PROGRESS_PENALTY = 0.08 | |
| REPEATED_ACTION_PENALTY = 0.05 | |
| BASE_STEP_PENALTY = 0.02 | |
| ANALYZE_STEP_PENALTY = 0.01 | |
| SUBMIT_COMPLETION_BONUS = 0.30 | |
| TIMEOUT_PENALTY = 0.12 | |
| VALID_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"} | |
| def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: | |
| """Clamp a scalar to a bounded numeric interval.""" | |
| try: | |
| return max(low, min(high, float(value))) | |
| except Exception: | |
| return low | |
| def _safe_text(value: Any, default: str = "") -> str: | |
| """Convert values into short stable strings.""" | |
| try: | |
| text = str(value) | |
| except Exception: | |
| return default | |
| text = " ".join(text.split()) | |
| return text[:240] if text else default | |
| class PythonCodeReviewEnvironment( | |
| Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState] | |
| ): | |
| """Deterministic, bounded, evaluator-safe environment for code repair tasks.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self, verbose: bool = False) -> None: | |
| super().__init__() | |
| self._verbose = bool(verbose) | |
| self._task_order = self._safe_task_order() | |
| self._task_cursor = -1 | |
| self._task: Optional[TaskSpec] = None | |
| self._state = PythonCodeReviewState(episode_id=str(uuid4())) | |
| self._done = False | |
| self._last_status = "Call reset() to start." | |
| self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.") | |
| self._metrics = self._blank_metrics() | |
| self._last_action_type = "" | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Reset the environment for a deterministic task and return an observation.""" | |
| del seed | |
| try: | |
| self._reset_rubric() | |
| except Exception: | |
| pass | |
| task = self._select_task(task_id) | |
| self._task = task | |
| self._done = False | |
| self._metrics = self._blank_metrics() | |
| self._last_action_type = "" | |
| self._last_status = "Inspect the code, run checks, edit the code, then submit." | |
| self._last_reward = RewardDetails( | |
| value=0.0, | |
| reason="Episode reset.", | |
| prev_score=0.0, | |
| curr_score=0.0, | |
| ) | |
| self._state = PythonCodeReviewState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=task.task_id, | |
| difficulty=task.difficulty, | |
| task_kind=task.task_kind, | |
| attempts_remaining=max(int(task.max_steps), 1), | |
| current_code=task.starter_code, | |
| errors="", | |
| test_results="No checks run yet.", | |
| history=[], | |
| score=0.0, | |
| done=False, | |
| ) | |
| return self._build_observation() | |
| def step( | |
| self, | |
| action: PythonCodeReviewAction, | |
| timeout_s: Optional[float] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Execute one safe environment step and always return a valid observation.""" | |
| del timeout_s | |
| try: | |
| if self._task is None: | |
| return self.reset() | |
| if self._done: | |
| self._last_status = "Episode already completed. Call reset() to continue." | |
| self._last_reward = RewardDetails( | |
| value=-INVALID_ACTION_PENALTY, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason="Episode already completed.", | |
| prev_score=self._metrics["score"], | |
| curr_score=self._metrics["score"], | |
| code_changed=False, | |
| ) | |
| return self._build_observation() | |
| self._state.step_count += 1 | |
| action_type = _safe_text(getattr(action, "action_type", "analyze_code"), "analyze_code") | |
| code = getattr(action, "code", None) | |
| if action_type == "analyze_code": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) | |
| elif action_type == "run_tests": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) | |
| elif action_type == "edit_code": | |
| self._handle_edit(code) | |
| elif action_type == "submit_solution": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=True) | |
| self._done = True | |
| else: | |
| self._apply_invalid_action(f"Unsupported action_type '{action_type}'.") | |
| self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0) | |
| if self._state.attempts_remaining == 0 and not self._done: | |
| self._auto_submit() | |
| self._state.done = self._done | |
| return self._build_observation() | |
| except Exception as exc: | |
| self._apply_invalid_action(f"Step failure handled: {_safe_text(exc, 'unknown_error')}") | |
| self._state.done = self._done | |
| return self._build_observation() | |
| def state(self) -> PythonCodeReviewState: | |
| """Return a deep copy of the current environment state.""" | |
| try: | |
| return self._state.model_copy(deep=True) | |
| except Exception: | |
| return PythonCodeReviewState(episode_id=str(uuid4())) | |
| def list_task_summaries(self) -> list[object]: | |
| """Return public task summaries.""" | |
| try: | |
| return list_task_summaries() | |
| except Exception: | |
| return [] | |
| def get_task(self, task_id: str) -> object: | |
| """Return a single public task descriptor.""" | |
| return self._select_task(task_id).to_descriptor() | |
| def health(self) -> HealthResponse: | |
| """Return a simple health response.""" | |
| return HealthResponse(task_count=len(self._task_order)) | |
| def grade_task_submission(self, task_id: str, code: str) -> TaskGrade: | |
| """Grade a task submission outside an episode without raising.""" | |
| try: | |
| task = self._select_task(task_id) | |
| return self._safe_grade(task=task, candidate_code=code, include_hidden=True) | |
| except Exception as exc: | |
| return TaskGrade(score=0.0, details={"error": _safe_text(exc, "grading_failed")}) | |
| def run_tests(self, code: str, include_hidden: bool = False) -> tuple[float, dict[str, int], TaskGrade]: | |
| """Run deterministic grading and return score plus test summary.""" | |
| task = self._task or self._select_task(None) | |
| grade = self._safe_grade(task=task, candidate_code=code, include_hidden=include_hidden) | |
| return ( | |
| _clamp(grade.score), | |
| {"passed": int(grade.tests_passed), "total": int(grade.tests_total)}, | |
| grade, | |
| ) | |
| def apply_action(self, action: PythonCodeReviewAction) -> str: | |
| """Return the candidate code implied by the action.""" | |
| if getattr(action, "action_type", "") == "edit_code": | |
| code = getattr(action, "code", None) | |
| return str(code) if code is not None else self._state.current_code | |
| return self._state.current_code | |
| def compute_reward( | |
| self, | |
| action_type: str, | |
| previous_metrics: dict[str, float], | |
| current_metrics: dict[str, float], | |
| grade: TaskGrade, | |
| code_changed: bool, | |
| invalid_action: bool = False, | |
| ) -> RewardDetails: | |
| """Compute a bounded dynamic reward with progress and efficiency shaping.""" | |
| prev_score = _clamp(previous_metrics.get("score", 0.0)) | |
| curr_score = _clamp(current_metrics.get("score", 0.0)) | |
| score_delta = curr_score - prev_score | |
| test_delta = current_metrics.get("test_fraction", 0.0) - previous_metrics.get("test_fraction", 0.0) | |
| syntax_delta = current_metrics.get("syntax_score", 0.0) - previous_metrics.get("syntax_score", 0.0) | |
| quality_delta = current_metrics.get("quality_score", 0.0) - previous_metrics.get("quality_score", 0.0) | |
| step_penalty = BASE_STEP_PENALTY + (ANALYZE_STEP_PENALTY if action_type == "analyze_code" else 0.0) | |
| repeated_penalty = REPEATED_ACTION_PENALTY if action_type == self._last_action_type else 0.0 | |
| no_progress = ( | |
| score_delta <= 1e-9 | |
| and test_delta <= 1e-9 | |
| and syntax_delta <= 1e-9 | |
| and quality_delta <= 1e-9 | |
| and not code_changed | |
| ) | |
| stagnation_penalty = NO_PROGRESS_PENALTY if no_progress and not invalid_action else 0.0 | |
| regression_penalty = max(-score_delta, 0.0) * 0.6 + repeated_penalty + step_penalty | |
| invalid_penalty = INVALID_ACTION_PENALTY if invalid_action else 0.0 | |
| timeout_penalty = TIMEOUT_PENALTY if bool(grade.timed_out) else 0.0 | |
| progress_reward = max(score_delta, 0.0) * 0.7 | |
| syntax_reward = max(syntax_delta, 0.0) * 0.5 | |
| test_reward = max(test_delta, 0.0) * 1.0 | |
| quality_bonus = max(quality_delta, 0.0) * 0.2 | |
| correctness_bonus = SUBMIT_COMPLETION_BONUS if action_type == "submit_solution" and curr_score >= 0.999 else 0.0 | |
| reward_value = ( | |
| progress_reward | |
| + syntax_reward | |
| + test_reward | |
| + quality_bonus | |
| + correctness_bonus | |
| - stagnation_penalty | |
| - regression_penalty | |
| - invalid_penalty | |
| - timeout_penalty | |
| ) | |
| reward_value = max(-1.0, min(1.0, round(reward_value, 6))) | |
| return RewardDetails( | |
| value=reward_value, | |
| syntax_reward=round(syntax_reward, 6), | |
| test_reward=round(test_reward, 6), | |
| quality_bonus=round(quality_bonus, 6), | |
| correctness_bonus=round(correctness_bonus, 6), | |
| progress_delta=round(progress_reward, 6), | |
| stagnation_penalty=round(stagnation_penalty, 6), | |
| regression_penalty=round(regression_penalty, 6), | |
| invalid_action_penalty=round(invalid_penalty, 6), | |
| timeout_penalty=round(timeout_penalty, 6), | |
| reason=f"{action_type} reward computed safely", | |
| prev_score=round(prev_score, 6), | |
| curr_score=round(curr_score, 6), | |
| code_changed=bool(code_changed), | |
| ) | |
| def _safe_task_order(self) -> list[str]: | |
| """Load deterministic task ids with a hard fallback.""" | |
| try: | |
| loaded = list(task_ids()) | |
| if loaded: | |
| return [str(task_id) for task_id in loaded] | |
| except Exception: | |
| pass | |
| return ["syntax-fix-easy", "bug-fix-medium", "optimization-hard"] | |
| def _blank_metrics(self) -> dict[str, float]: | |
| """Return an empty metric snapshot.""" | |
| return { | |
| "score": 0.0, | |
| "test_fraction": 0.0, | |
| "syntax_score": 0.0, | |
| "quality_score": 0.0, | |
| } | |
| def _select_task(self, task_id: Optional[str]) -> TaskSpec: | |
| """Select the requested task or advance deterministically.""" | |
| try: | |
| if task_id: | |
| task = load_task(task_id) | |
| if task.task_id in self._task_order: | |
| self._task_cursor = self._task_order.index(task.task_id) | |
| return task | |
| except Exception: | |
| pass | |
| try: | |
| self._task_cursor = (self._task_cursor + 1) % len(self._task_order) | |
| return load_task(self._task_order[self._task_cursor]) | |
| except Exception: | |
| return load_task("syntax-fix-easy") | |
| def _safe_grade(self, task: TaskSpec, candidate_code: str, include_hidden: bool) -> TaskGrade: | |
| """Run grading without allowing exceptions to escape.""" | |
| try: | |
| return grade_task(candidate_code, task, include_hidden=include_hidden) | |
| except Exception as exc: | |
| return TaskGrade( | |
| score=0.0, | |
| syntax_score=0.0, | |
| tests_passed=0, | |
| tests_total=max(len(task.visible_tests), 1), | |
| details={"compile_error": "", "error": _safe_text(exc, "grading_failed")}, | |
| ) | |
| def _metrics_from_grade(self, grade: TaskGrade) -> dict[str, float]: | |
| """Derive normalized reward metrics from a grading result.""" | |
| tests_total = max(int(grade.tests_total), 0) | |
| tests_passed = max(int(grade.tests_passed), 0) | |
| test_fraction = (tests_passed / tests_total) if tests_total else _clamp(grade.syntax_score) | |
| return { | |
| "score": _clamp(grade.score), | |
| "test_fraction": _clamp(test_fraction), | |
| "syntax_score": _clamp(grade.syntax_score), | |
| "quality_score": _clamp(grade.quality_score), | |
| } | |
| def _format_test_results(self, grade: TaskGrade, include_hidden: bool) -> str: | |
| """Format test execution results for the observation.""" | |
| compile_error = _safe_text(grade.details.get("compile_error", ""), "") | |
| scope = "all checks" if include_hidden else "visible checks" | |
| if compile_error: | |
| return f"{scope}: compile error: {compile_error}" | |
| if grade.timed_out: | |
| return f"{scope}: execution timed out" | |
| if self._task and self._task.task_kind == "syntax_fix": | |
| return "visible checks: code compiles successfully" | |
| return f"{scope}: {int(grade.tests_passed)}/{int(grade.tests_total)} passing" | |
| def _build_status(self, action_type: str, grade: TaskGrade) -> str: | |
| """Build a human-readable status message.""" | |
| if action_type == "submit_solution": | |
| return f"Solution submitted. Final score: {_clamp(grade.score):.3f}" | |
| if action_type == "edit_code": | |
| if grade.details.get("compile_error"): | |
| return "Code updated, but syntax issues remain." | |
| return "Code updated and evaluated." | |
| if action_type == "run_tests": | |
| return "Test run completed." | |
| if action_type == "analyze_code": | |
| return "Analysis completed." | |
| return "Action handled safely." | |
| def _apply_grade_to_state(self, grade: TaskGrade, include_hidden: bool) -> None: | |
| """Update environment state from the latest grading result.""" | |
| compile_error = _safe_text(grade.details.get("compile_error", ""), "") | |
| self._state.score = _clamp(grade.score) | |
| self._state.errors = compile_error | |
| self._state.test_results = self._format_test_results(grade, include_hidden=include_hidden) | |
| def _handle_scored_action(self, action_type: str, candidate_code: str, include_hidden: bool) -> None: | |
| """Grade code, update state, and compute reward for a valid action.""" | |
| task = self._task or self._select_task(None) | |
| previous_metrics = dict(self._metrics) | |
| prior_code = self._state.current_code | |
| code_changed = candidate_code.strip() != prior_code.strip() | |
| if action_type == "edit_code": | |
| self._state.current_code = candidate_code | |
| grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=include_hidden) | |
| current_metrics = self._metrics_from_grade(grade) | |
| self._apply_grade_to_state(grade, include_hidden=include_hidden) | |
| self._last_reward = self.compute_reward( | |
| action_type=action_type, | |
| previous_metrics=previous_metrics, | |
| current_metrics=current_metrics, | |
| grade=grade, | |
| code_changed=code_changed, | |
| invalid_action=False, | |
| ) | |
| self._last_status = self._build_status(action_type, grade) | |
| self._metrics = current_metrics | |
| self._last_action_type = action_type | |
| self._append_history(action_type, self._last_status, self._last_reward.value) | |
| def _handle_edit(self, code: Optional[str]) -> None: | |
| """Validate edit input and evaluate the new candidate code.""" | |
| safe_code = (code or "").strip() | |
| if not safe_code: | |
| self._apply_invalid_action("edit_code requires code parameter.") | |
| return | |
| self._handle_scored_action(action_type="edit_code", candidate_code=safe_code, include_hidden=False) | |
| def _apply_invalid_action(self, reason: str) -> None: | |
| """Record an invalid action without crashing the episode.""" | |
| previous_metrics = dict(self._metrics) | |
| grade = TaskGrade(score=previous_metrics["score"], syntax_score=previous_metrics["syntax_score"]) | |
| self._last_reward = self.compute_reward( | |
| action_type="invalid", | |
| previous_metrics=previous_metrics, | |
| current_metrics=previous_metrics, | |
| grade=grade, | |
| code_changed=False, | |
| invalid_action=True, | |
| ) | |
| self._last_status = reason | |
| self._append_history("analyze_code", reason, self._last_reward.value) | |
| def _auto_submit(self) -> None: | |
| """Finalize the episode when attempts are exhausted.""" | |
| task = self._task or self._select_task(None) | |
| grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=True) | |
| self._apply_grade_to_state(grade, include_hidden=True) | |
| self._done = True | |
| self._state.done = True | |
| self._last_status = f"Auto-submitted. Final score: {_clamp(grade.score):.3f}" | |
| def _append_history(self, action_type: str, status: str, reward: float) -> None: | |
| """Append one action record to the episode history.""" | |
| try: | |
| stable_action = action_type if action_type in VALID_ACTIONS else "analyze_code" | |
| self._state.history.append( | |
| HistoryEntry( | |
| step=max(int(self._state.step_count), 0), | |
| action_type=stable_action, | |
| status=_safe_text(status, "handled"), | |
| reward=float(reward), | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| def _build_observation(self) -> PythonCodeReviewObservation: | |
| """Build a valid observation from current state.""" | |
| task = self._task | |
| try: | |
| return PythonCodeReviewObservation( | |
| task_id=self._state.task_id or "", | |
| title=task.title if task else "", | |
| difficulty=self._state.difficulty or "easy", | |
| task_kind=self._state.task_kind, | |
| task_description=task.task_description if task else "", | |
| current_code=self._state.current_code, | |
| errors=self._state.errors, | |
| test_results=self._state.test_results, | |
| visible_tests=list(task.visible_tests) if task else [], | |
| history=list(self._state.history), | |
| attempts_remaining=max(int(self._state.attempts_remaining), 0), | |
| last_action_status=self._last_status, | |
| score=_clamp(self._state.score), | |
| reward_details=self._last_reward, | |
| reward=self._last_reward.value, | |
| done=bool(self._state.done), | |
| metadata={ | |
| "prev_score": self._last_reward.prev_score, | |
| "curr_score": self._last_reward.curr_score, | |
| }, | |
| ) | |
| except Exception as exc: | |
| return PythonCodeReviewObservation( | |
| task_id=self._state.task_id or "", | |
| title="", | |
| difficulty="easy", | |
| task_kind=None, | |
| task_description="", | |
| current_code=getattr(self._state, "current_code", ""), | |
| errors=_safe_text(exc, "observation_build_failed"), | |
| test_results="visible checks: unavailable", | |
| visible_tests=[], | |
| history=[], | |
| attempts_remaining=0, | |
| last_action_status="Observation fallback returned safely.", | |
| score=0.0, | |
| reward_details=RewardDetails(value=0.0, reason="Observation fallback."), | |
| reward=0.0, | |
| done=bool(getattr(self._state, "done", False)), | |
| metadata={}, | |
| ) | |
| PythonEnvironment = PythonCodeReviewEnvironment | |
| CodeReviewEnvironment = PythonCodeReviewEnvironment | |