UjjwalPardeshi
fix: add has_grader: true to all tasks for evaluator grader check
fc3fbaf
"""FastAPI app — openenv create_app() + custom routes."""
from __future__ import annotations
import asyncio
import json
import logging
import sys
from typing import Optional
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from openenv.core.env_server.http_server import create_app
from ml_training_debugger.models import MLTrainingAction, MLTrainingObservation
from server._baseline_results import get_last_grader_result
from server.environment import MLTrainingEnvironment
# Structured JSON logging
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_data = {
"time": self.formatTime(record),
"level": record.levelname,
"msg": record.getMessage(),
}
if hasattr(record, "session_id"):
log_data["session_id"] = record.session_id
if hasattr(record, "task_id"):
log_data["task_id"] = record.task_id
if hasattr(record, "step_count"):
log_data["step_count"] = record.step_count
if hasattr(record, "action_type"):
log_data["action_type"] = record.action_type
if hasattr(record, "score"):
log_data["score"] = record.score
return json.dumps(log_data)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logging.root.handlers = [handler]
logging.root.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
# All 7 tasks
ALL_TASKS = [
{"id": "task_001", "difficulty": "easy", "max_steps": 20, "has_grader": True},
{"id": "task_002", "difficulty": "easy", "max_steps": 20, "has_grader": True},
{"id": "task_003", "difficulty": "medium", "max_steps": 25, "has_grader": True},
{"id": "task_004", "difficulty": "medium", "max_steps": 25, "has_grader": True},
{"id": "task_005", "difficulty": "hard", "max_steps": 30, "has_grader": True},
{"id": "task_006", "difficulty": "hard", "max_steps": 30, "has_grader": True},
{"id": "task_007", "difficulty": "hard", "max_steps": 25, "has_grader": True},
]
# create_app takes the class (factory), not an instance
app: FastAPI = create_app(
MLTrainingEnvironment,
MLTrainingAction,
MLTrainingObservation,
env_name="pytorch_training_debugger",
max_concurrent_envs=5,
)
# Override framework's /health route with our custom version
app.routes[:] = [
r for r in app.routes if not (hasattr(r, "path") and r.path == "/health")
]
# Thread-safe baseline lock (Fix #14)
_baseline_lock = asyncio.Lock()
@app.get("/")
def root() -> RedirectResponse:
"""Redirect root to dashboard."""
return RedirectResponse(url="/dashboard")
@app.get("/health")
def health_check() -> dict:
"""Health check — required by hackathon auto-validator."""
return {"status": "healthy", "tasks": len(ALL_TASKS)}
@app.get("/dashboard", response_class=HTMLResponse)
def get_dashboard() -> str:
"""Serve live diagnostic dashboard."""
import pathlib
html_path = pathlib.Path(__file__).parent / "dashboard.html"
return html_path.read_text()
@app.get("/validation-report")
def get_validation_report() -> dict:
"""Serve pre-computed simulation fidelity report."""
import pathlib
report_path = (
pathlib.Path(__file__).parent.parent
/ "validation"
/ "reports"
/ "fidelity_report.json"
)
if report_path.exists():
return json.loads(report_path.read_text())
return {
"error": "Validation report not yet generated. "
"Run: python validation/run_all_validations.py"
}
@app.get("/curriculum")
def get_curriculum() -> dict:
"""Recommended task order (easy to hard, with difficulty scaling)."""
curriculum: list[dict] = []
for task in ALL_TASKS:
for level in [1, 3, 5]:
curriculum.append({
"task_id": task["id"],
"difficulty": task["difficulty"],
"difficulty_level": level,
"max_steps": task["max_steps"],
})
return {"curriculum": curriculum, "total_episodes": len(curriculum)}
@app.get("/leaderboard")
def get_leaderboard() -> dict:
"""Sorted leaderboard of completed episode scores."""
from server._baseline_results import _last_results
entries = [
v for k, v in _last_results.items()
if k != "_latest" and isinstance(v, dict)
]
sorted_entries = sorted(
entries, key=lambda x: x.get("score", 0), reverse=True
)
return {"entries": sorted_entries, "total": len(sorted_entries)}
@app.get("/replay/{episode_id}")
def get_replay(episode_id: str) -> dict:
"""Return full action/observation trace for a completed episode."""
from server._baseline_results import _last_results
result = _last_results.get(episode_id)
if result is None:
return {"error": f"Episode '{episode_id}' not found"}
return {"episode_id": episode_id, **result}
@app.get("/tasks")
def get_tasks() -> list[dict]:
"""Return task list with IDs, difficulties, and action schema."""
schema = MLTrainingAction.model_json_schema()
return [{**task, "action_schema": schema} for task in ALL_TASKS]
@app.post("/grader")
def post_grader(session_id: Optional[str] = None) -> dict:
"""Return grader score for most recently completed episode."""
result = get_last_grader_result(session_id)
if result is None:
return {"score": None, "error": "no_completed_episode"}
return result
@app.post("/baseline", response_model=None)
async def post_baseline() -> JSONResponse | dict:
"""Trigger baseline run, return scores for all tasks."""
if _baseline_lock.locked():
return JSONResponse(
status_code=409,
content={"error": "baseline_in_progress"},
)
async with _baseline_lock:
from server._heuristic import run_baseline_all_tasks
scores = await asyncio.get_event_loop().run_in_executor(
None, run_baseline_all_tasks
)
return {"scores": scores}
def main() -> None:
"""Entry point for running the server."""
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()