| | """Trajectory serialisation and dataset utilities.
|
| |
|
| | A ``Trajectory`` stores the full history of one episode (task, actions,
|
| | observations, rewards, latent-state snapshots) in a format that supports:
|
| | - offline RL training
|
| | - imitation learning from expert demonstrations
|
| | - evaluation / replay
|
| | - simulator calibration
|
| | """
|
| |
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | from dataclasses import dataclass, field
|
| | from pathlib import Path
|
| | from typing import Any, Dict, List, Optional
|
| |
|
| | from models import (
|
| | ExperimentAction,
|
| | ExperimentObservation,
|
| | TaskSpec,
|
| | )
|
| |
|
| |
|
| | @dataclass
|
| | class TrajectoryStep:
|
| | step_index: int
|
| | action: Dict[str, Any]
|
| | observation: Dict[str, Any]
|
| | reward: float
|
| | done: bool
|
| | reward_breakdown: Dict[str, float] = field(default_factory=dict)
|
| | latent_snapshot: Optional[Dict[str, Any]] = None
|
| |
|
| |
|
| | @dataclass
|
| | class Trajectory:
|
| | """Complete record of one environment episode."""
|
| |
|
| | episode_id: str
|
| | task: Dict[str, Any]
|
| | steps: List[TrajectoryStep] = field(default_factory=list)
|
| | total_reward: float = 0.0
|
| | success: bool = False
|
| | metadata: Dict[str, Any] = field(default_factory=dict)
|
| |
|
| |
|
| |
|
| | def add_step(
|
| | self,
|
| | action: ExperimentAction,
|
| | observation: ExperimentObservation,
|
| | reward: float,
|
| | done: bool,
|
| | reward_breakdown: Optional[Dict[str, float]] = None,
|
| | latent_snapshot: Optional[Dict[str, Any]] = None,
|
| | ) -> None:
|
| | self.steps.append(TrajectoryStep(
|
| | step_index=len(self.steps),
|
| | action=action.model_dump(),
|
| | observation=observation.model_dump(),
|
| | reward=reward,
|
| | done=done,
|
| | reward_breakdown=reward_breakdown or {},
|
| | latent_snapshot=latent_snapshot,
|
| | ))
|
| | self.total_reward += reward
|
| | if done:
|
| | self.success = reward > 0
|
| |
|
| |
|
| |
|
| | def to_dict(self) -> Dict[str, Any]:
|
| | return {
|
| | "episode_id": self.episode_id,
|
| | "task": self.task,
|
| | "steps": [
|
| | {
|
| | "step_index": s.step_index,
|
| | "action": s.action,
|
| | "observation": s.observation,
|
| | "reward": s.reward,
|
| | "done": s.done,
|
| | "reward_breakdown": s.reward_breakdown,
|
| | "latent_snapshot": s.latent_snapshot,
|
| | }
|
| | for s in self.steps
|
| | ],
|
| | "total_reward": self.total_reward,
|
| | "success": self.success,
|
| | "metadata": self.metadata,
|
| | }
|
| |
|
| | def save(self, path: str | Path) -> None:
|
| | p = Path(path)
|
| | p.parent.mkdir(parents=True, exist_ok=True)
|
| | with open(p, "w") as f:
|
| | json.dump(self.to_dict(), f, indent=2, default=str)
|
| |
|
| | @classmethod
|
| | def load(cls, path: str | Path) -> "Trajectory":
|
| | with open(path) as f:
|
| | d = json.load(f)
|
| | traj = cls(
|
| | episode_id=d["episode_id"],
|
| | task=d["task"],
|
| | total_reward=d.get("total_reward", 0.0),
|
| | success=d.get("success", False),
|
| | metadata=d.get("metadata", {}),
|
| | )
|
| | for s in d.get("steps", []):
|
| | traj.steps.append(TrajectoryStep(**s))
|
| | return traj
|
| |
|
| |
|
| | class TrajectoryDataset:
|
| | """In-memory collection of trajectories with convenience accessors."""
|
| |
|
| | def __init__(self, trajectories: Optional[List[Trajectory]] = None):
|
| | self.trajectories: List[Trajectory] = trajectories or []
|
| |
|
| | def add(self, traj: Trajectory) -> None:
|
| | self.trajectories.append(traj)
|
| |
|
| | def __len__(self) -> int:
|
| | return len(self.trajectories)
|
| |
|
| | def __getitem__(self, idx: int) -> Trajectory:
|
| | return self.trajectories[idx]
|
| |
|
| | def filter_successful(self) -> "TrajectoryDataset":
|
| | return TrajectoryDataset([t for t in self.trajectories if t.success])
|
| |
|
| | def save_dir(self, directory: str | Path) -> None:
|
| | d = Path(directory)
|
| | d.mkdir(parents=True, exist_ok=True)
|
| | for t in self.trajectories:
|
| | t.save(d / f"{t.episode_id}.json")
|
| |
|
| | @classmethod
|
| | def load_dir(cls, directory: str | Path) -> "TrajectoryDataset":
|
| | d = Path(directory)
|
| | trajs = [Trajectory.load(p) for p in sorted(d.glob("*.json"))]
|
| | return cls(trajs)
|
| |
|
| | def summary(self) -> Dict[str, Any]:
|
| | if not self.trajectories:
|
| | return {"n": 0}
|
| | rewards = [t.total_reward for t in self.trajectories]
|
| | lengths = [len(t.steps) for t in self.trajectories]
|
| | success_rate = sum(1 for t in self.trajectories if t.success) / len(self.trajectories)
|
| | return {
|
| | "n": len(self.trajectories),
|
| | "success_rate": success_rate,
|
| | "mean_reward": sum(rewards) / len(rewards),
|
| | "mean_length": sum(lengths) / len(lengths),
|
| | "max_reward": max(rewards),
|
| | "min_reward": min(rewards),
|
| | }
|
| |
|