python_env / server /env_safe.py
uvpatel7271's picture
Upload folder using huggingface_hub
c8e832f verified
"""Safe OpenEnv environment for deterministic Python code repair tasks."""
from __future__ import annotations
from typing import Any, Optional
from uuid import uuid4
from compat import Environment
from graders import grade_task
from models import (
HealthResponse,
HistoryEntry,
PythonCodeReviewAction,
PythonCodeReviewObservation,
PythonCodeReviewState,
RewardDetails,
TaskGrade,
)
from tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids
INVALID_ACTION_PENALTY = 0.10
NO_PROGRESS_PENALTY = 0.08
REPEATED_ACTION_PENALTY = 0.05
BASE_STEP_PENALTY = 0.02
ANALYZE_STEP_PENALTY = 0.01
SUBMIT_COMPLETION_BONUS = 0.30
TIMEOUT_PENALTY = 0.12
VALID_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"}
def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float:
"""Clamp a scalar to a bounded numeric interval."""
try:
return max(low, min(high, float(value)))
except Exception:
return low
def _safe_text(value: Any, default: str = "") -> str:
"""Convert values into short stable strings."""
try:
text = str(value)
except Exception:
return default
text = " ".join(text.split())
return text[:240] if text else default
class PythonCodeReviewEnvironment(
Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState]
):
"""Deterministic, bounded, evaluator-safe environment for code repair tasks."""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self, verbose: bool = False) -> None:
super().__init__()
self._verbose = bool(verbose)
self._task_order = self._safe_task_order()
self._task_cursor = -1
self._task: Optional[TaskSpec] = None
self._state = PythonCodeReviewState(episode_id=str(uuid4()))
self._done = False
self._last_status = "Call reset() to start."
self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.")
self._metrics = self._blank_metrics()
self._last_action_type = ""
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**_: object,
) -> PythonCodeReviewObservation:
"""Reset the environment for a deterministic task and return an observation."""
del seed
try:
self._reset_rubric()
except Exception:
pass
task = self._select_task(task_id)
self._task = task
self._done = False
self._metrics = self._blank_metrics()
self._last_action_type = ""
self._last_status = "Inspect the code, run checks, edit the code, then submit."
self._last_reward = RewardDetails(
value=0.0,
reason="Episode reset.",
prev_score=0.0,
curr_score=0.0,
)
self._state = PythonCodeReviewState(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=task.task_id,
difficulty=task.difficulty,
task_kind=task.task_kind,
attempts_remaining=max(int(task.max_steps), 1),
current_code=task.starter_code,
errors="",
test_results="No checks run yet.",
history=[],
score=0.0,
done=False,
)
return self._build_observation()
def step(
self,
action: PythonCodeReviewAction,
timeout_s: Optional[float] = None,
**_: object,
) -> PythonCodeReviewObservation:
"""Execute one safe environment step and always return a valid observation."""
del timeout_s
try:
if self._task is None:
return self.reset()
if self._done:
self._last_status = "Episode already completed. Call reset() to continue."
self._last_reward = RewardDetails(
value=-INVALID_ACTION_PENALTY,
invalid_action_penalty=INVALID_ACTION_PENALTY,
reason="Episode already completed.",
prev_score=self._metrics["score"],
curr_score=self._metrics["score"],
code_changed=False,
)
return self._build_observation()
self._state.step_count += 1
action_type = _safe_text(getattr(action, "action_type", "analyze_code"), "analyze_code")
code = getattr(action, "code", None)
if action_type == "analyze_code":
self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False)
elif action_type == "run_tests":
self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False)
elif action_type == "edit_code":
self._handle_edit(code)
elif action_type == "submit_solution":
self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=True)
self._done = True
else:
self._apply_invalid_action(f"Unsupported action_type '{action_type}'.")
self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0)
if self._state.attempts_remaining == 0 and not self._done:
self._auto_submit()
self._state.done = self._done
return self._build_observation()
except Exception as exc:
self._apply_invalid_action(f"Step failure handled: {_safe_text(exc, 'unknown_error')}")
self._state.done = self._done
return self._build_observation()
@property
def state(self) -> PythonCodeReviewState:
"""Return a deep copy of the current environment state."""
try:
return self._state.model_copy(deep=True)
except Exception:
return PythonCodeReviewState(episode_id=str(uuid4()))
def list_task_summaries(self) -> list[object]:
"""Return public task summaries."""
try:
return list_task_summaries()
except Exception:
return []
def get_task(self, task_id: str) -> object:
"""Return a single public task descriptor."""
return self._select_task(task_id).to_descriptor()
def health(self) -> HealthResponse:
"""Return a simple health response."""
return HealthResponse(task_count=len(self._task_order))
def grade_task_submission(self, task_id: str, code: str) -> TaskGrade:
"""Grade a task submission outside an episode without raising."""
try:
task = self._select_task(task_id)
return self._safe_grade(task=task, candidate_code=code, include_hidden=True)
except Exception as exc:
return TaskGrade(score=0.0, details={"error": _safe_text(exc, "grading_failed")})
def run_tests(self, code: str, include_hidden: bool = False) -> tuple[float, dict[str, int], TaskGrade]:
"""Run deterministic grading and return score plus test summary."""
task = self._task or self._select_task(None)
grade = self._safe_grade(task=task, candidate_code=code, include_hidden=include_hidden)
return (
_clamp(grade.score),
{"passed": int(grade.tests_passed), "total": int(grade.tests_total)},
grade,
)
def apply_action(self, action: PythonCodeReviewAction) -> str:
"""Return the candidate code implied by the action."""
if getattr(action, "action_type", "") == "edit_code":
code = getattr(action, "code", None)
return str(code) if code is not None else self._state.current_code
return self._state.current_code
def compute_reward(
self,
action_type: str,
previous_metrics: dict[str, float],
current_metrics: dict[str, float],
grade: TaskGrade,
code_changed: bool,
invalid_action: bool = False,
) -> RewardDetails:
"""Compute a bounded dynamic reward with progress and efficiency shaping."""
prev_score = _clamp(previous_metrics.get("score", 0.0))
curr_score = _clamp(current_metrics.get("score", 0.0))
score_delta = curr_score - prev_score
test_delta = current_metrics.get("test_fraction", 0.0) - previous_metrics.get("test_fraction", 0.0)
syntax_delta = current_metrics.get("syntax_score", 0.0) - previous_metrics.get("syntax_score", 0.0)
quality_delta = current_metrics.get("quality_score", 0.0) - previous_metrics.get("quality_score", 0.0)
step_penalty = BASE_STEP_PENALTY + (ANALYZE_STEP_PENALTY if action_type == "analyze_code" else 0.0)
repeated_penalty = REPEATED_ACTION_PENALTY if action_type == self._last_action_type else 0.0
no_progress = (
score_delta <= 1e-9
and test_delta <= 1e-9
and syntax_delta <= 1e-9
and quality_delta <= 1e-9
and not code_changed
)
stagnation_penalty = NO_PROGRESS_PENALTY if no_progress and not invalid_action else 0.0
regression_penalty = max(-score_delta, 0.0) * 0.6 + repeated_penalty + step_penalty
invalid_penalty = INVALID_ACTION_PENALTY if invalid_action else 0.0
timeout_penalty = TIMEOUT_PENALTY if bool(grade.timed_out) else 0.0
progress_reward = max(score_delta, 0.0) * 0.7
syntax_reward = max(syntax_delta, 0.0) * 0.5
test_reward = max(test_delta, 0.0) * 1.0
quality_bonus = max(quality_delta, 0.0) * 0.2
correctness_bonus = SUBMIT_COMPLETION_BONUS if action_type == "submit_solution" and curr_score >= 0.999 else 0.0
reward_value = (
progress_reward
+ syntax_reward
+ test_reward
+ quality_bonus
+ correctness_bonus
- stagnation_penalty
- regression_penalty
- invalid_penalty
- timeout_penalty
)
reward_value = max(-1.0, min(1.0, round(reward_value, 6)))
return RewardDetails(
value=reward_value,
syntax_reward=round(syntax_reward, 6),
test_reward=round(test_reward, 6),
quality_bonus=round(quality_bonus, 6),
correctness_bonus=round(correctness_bonus, 6),
progress_delta=round(progress_reward, 6),
stagnation_penalty=round(stagnation_penalty, 6),
regression_penalty=round(regression_penalty, 6),
invalid_action_penalty=round(invalid_penalty, 6),
timeout_penalty=round(timeout_penalty, 6),
reason=f"{action_type} reward computed safely",
prev_score=round(prev_score, 6),
curr_score=round(curr_score, 6),
code_changed=bool(code_changed),
)
def _safe_task_order(self) -> list[str]:
"""Load deterministic task ids with a hard fallback."""
try:
loaded = list(task_ids())
if loaded:
return [str(task_id) for task_id in loaded]
except Exception:
pass
return ["syntax-fix-easy", "bug-fix-medium", "optimization-hard"]
def _blank_metrics(self) -> dict[str, float]:
"""Return an empty metric snapshot."""
return {
"score": 0.0,
"test_fraction": 0.0,
"syntax_score": 0.0,
"quality_score": 0.0,
}
def _select_task(self, task_id: Optional[str]) -> TaskSpec:
"""Select the requested task or advance deterministically."""
try:
if task_id:
task = load_task(task_id)
if task.task_id in self._task_order:
self._task_cursor = self._task_order.index(task.task_id)
return task
except Exception:
pass
try:
self._task_cursor = (self._task_cursor + 1) % len(self._task_order)
return load_task(self._task_order[self._task_cursor])
except Exception:
return load_task("syntax-fix-easy")
def _safe_grade(self, task: TaskSpec, candidate_code: str, include_hidden: bool) -> TaskGrade:
"""Run grading without allowing exceptions to escape."""
try:
return grade_task(candidate_code, task, include_hidden=include_hidden)
except Exception as exc:
return TaskGrade(
score=0.0,
syntax_score=0.0,
tests_passed=0,
tests_total=max(len(task.visible_tests), 1),
details={"compile_error": "", "error": _safe_text(exc, "grading_failed")},
)
def _metrics_from_grade(self, grade: TaskGrade) -> dict[str, float]:
"""Derive normalized reward metrics from a grading result."""
tests_total = max(int(grade.tests_total), 0)
tests_passed = max(int(grade.tests_passed), 0)
test_fraction = (tests_passed / tests_total) if tests_total else _clamp(grade.syntax_score)
return {
"score": _clamp(grade.score),
"test_fraction": _clamp(test_fraction),
"syntax_score": _clamp(grade.syntax_score),
"quality_score": _clamp(grade.quality_score),
}
def _format_test_results(self, grade: TaskGrade, include_hidden: bool) -> str:
"""Format test execution results for the observation."""
compile_error = _safe_text(grade.details.get("compile_error", ""), "")
scope = "all checks" if include_hidden else "visible checks"
if compile_error:
return f"{scope}: compile error: {compile_error}"
if grade.timed_out:
return f"{scope}: execution timed out"
if self._task and self._task.task_kind == "syntax_fix":
return "visible checks: code compiles successfully"
return f"{scope}: {int(grade.tests_passed)}/{int(grade.tests_total)} passing"
def _build_status(self, action_type: str, grade: TaskGrade) -> str:
"""Build a human-readable status message."""
if action_type == "submit_solution":
return f"Solution submitted. Final score: {_clamp(grade.score):.3f}"
if action_type == "edit_code":
if grade.details.get("compile_error"):
return "Code updated, but syntax issues remain."
return "Code updated and evaluated."
if action_type == "run_tests":
return "Test run completed."
if action_type == "analyze_code":
return "Analysis completed."
return "Action handled safely."
def _apply_grade_to_state(self, grade: TaskGrade, include_hidden: bool) -> None:
"""Update environment state from the latest grading result."""
compile_error = _safe_text(grade.details.get("compile_error", ""), "")
self._state.score = _clamp(grade.score)
self._state.errors = compile_error
self._state.test_results = self._format_test_results(grade, include_hidden=include_hidden)
def _handle_scored_action(self, action_type: str, candidate_code: str, include_hidden: bool) -> None:
"""Grade code, update state, and compute reward for a valid action."""
task = self._task or self._select_task(None)
previous_metrics = dict(self._metrics)
prior_code = self._state.current_code
code_changed = candidate_code.strip() != prior_code.strip()
if action_type == "edit_code":
self._state.current_code = candidate_code
grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=include_hidden)
current_metrics = self._metrics_from_grade(grade)
self._apply_grade_to_state(grade, include_hidden=include_hidden)
self._last_reward = self.compute_reward(
action_type=action_type,
previous_metrics=previous_metrics,
current_metrics=current_metrics,
grade=grade,
code_changed=code_changed,
invalid_action=False,
)
self._last_status = self._build_status(action_type, grade)
self._metrics = current_metrics
self._last_action_type = action_type
self._append_history(action_type, self._last_status, self._last_reward.value)
def _handle_edit(self, code: Optional[str]) -> None:
"""Validate edit input and evaluate the new candidate code."""
safe_code = (code or "").strip()
if not safe_code:
self._apply_invalid_action("edit_code requires code parameter.")
return
self._handle_scored_action(action_type="edit_code", candidate_code=safe_code, include_hidden=False)
def _apply_invalid_action(self, reason: str) -> None:
"""Record an invalid action without crashing the episode."""
previous_metrics = dict(self._metrics)
grade = TaskGrade(score=previous_metrics["score"], syntax_score=previous_metrics["syntax_score"])
self._last_reward = self.compute_reward(
action_type="invalid",
previous_metrics=previous_metrics,
current_metrics=previous_metrics,
grade=grade,
code_changed=False,
invalid_action=True,
)
self._last_status = reason
self._append_history("analyze_code", reason, self._last_reward.value)
def _auto_submit(self) -> None:
"""Finalize the episode when attempts are exhausted."""
task = self._task or self._select_task(None)
grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=True)
self._apply_grade_to_state(grade, include_hidden=True)
self._done = True
self._state.done = True
self._last_status = f"Auto-submitted. Final score: {_clamp(grade.score):.3f}"
def _append_history(self, action_type: str, status: str, reward: float) -> None:
"""Append one action record to the episode history."""
try:
stable_action = action_type if action_type in VALID_ACTIONS else "analyze_code"
self._state.history.append(
HistoryEntry(
step=max(int(self._state.step_count), 0),
action_type=stable_action,
status=_safe_text(status, "handled"),
reward=float(reward),
)
)
except Exception:
pass
def _build_observation(self) -> PythonCodeReviewObservation:
"""Build a valid observation from current state."""
task = self._task
try:
return PythonCodeReviewObservation(
task_id=self._state.task_id or "",
title=task.title if task else "",
difficulty=self._state.difficulty or "easy",
task_kind=self._state.task_kind,
task_description=task.task_description if task else "",
current_code=self._state.current_code,
errors=self._state.errors,
test_results=self._state.test_results,
visible_tests=list(task.visible_tests) if task else [],
history=list(self._state.history),
attempts_remaining=max(int(self._state.attempts_remaining), 0),
last_action_status=self._last_status,
score=_clamp(self._state.score),
reward_details=self._last_reward,
reward=self._last_reward.value,
done=bool(self._state.done),
metadata={
"prev_score": self._last_reward.prev_score,
"curr_score": self._last_reward.curr_score,
},
)
except Exception as exc:
return PythonCodeReviewObservation(
task_id=self._state.task_id or "",
title="",
difficulty="easy",
task_kind=None,
task_description="",
current_code=getattr(self._state, "current_code", ""),
errors=_safe_text(exc, "observation_build_failed"),
test_results="visible checks: unavailable",
visible_tests=[],
history=[],
attempts_remaining=0,
last_action_status="Observation fallback returned safely.",
score=0.0,
reward_details=RewardDetails(value=0.0, reason="Observation fallback."),
reward=0.0,
done=bool(getattr(self._state, "done", False)),
metadata={},
)
PythonEnvironment = PythonCodeReviewEnvironment
CodeReviewEnvironment = PythonCodeReviewEnvironment