| |
| |
|
|
| """ |
| RANSEnvironment |
| =============== |
| OpenEnv ``Environment`` subclass that wraps the 2-D spacecraft simulator and |
| the RANS task suite. |
| |
| Supported tasks (set via RANS_TASK env-var or constructor argument): |
| β’ GoToPosition β reach a target (x, y) |
| β’ GoToPose β reach a target (x, y, ΞΈ) |
| β’ TrackLinearVelocity β maintain (vx_t, vy_t) |
| β’ TrackLinearAngularVelocity β maintain (vx_t, vy_t, Ο_t) |
| |
| The environment follows the RANS paper (arXiv:2310.07393) physics and reward |
| formulations, adapted to run in CPU-only Docker containers without Isaac Gym. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import os |
| import uuid |
| from typing import Any, Dict, Optional |
|
|
| import numpy as np |
|
|
| try: |
| from openenv.core.env_server.interfaces import Action, Environment, Observation |
| except ImportError: |
| from pydantic import BaseModel as Action |
| from pydantic import BaseModel as Environment |
| from pydantic import BaseModel as Observation |
|
|
| try: |
| |
| from rans_env.models import SpacecraftAction, SpacecraftObservation, SpacecraftState |
| from rans_env.server.spacecraft_physics import Spacecraft2D, SpacecraftConfig |
| from rans_env.server.tasks import TASK_REGISTRY |
| except ImportError: |
| |
| import sys, os as _os |
| sys.path.insert(0, _os.path.dirname(_os.path.dirname(__file__))) |
| from models import SpacecraftAction, SpacecraftObservation, SpacecraftState |
| from server.spacecraft_physics import Spacecraft2D, SpacecraftConfig |
| from server.tasks import TASK_REGISTRY |
|
|
|
|
| class RANSEnvironment(Environment): |
| """ |
| RANS spacecraft navigation environment for OpenEnv. |
| |
| References |
| ---------- |
| El-Hariry, Richard, Olivares-Mendez (2023). |
| "RANS: Highly-Parallelised Simulator for Reinforcement Learning based |
| Autonomous Navigating Spacecrafts." arXiv:2310.07393. |
| """ |
|
|
| def __init__( |
| self, |
| task: str = "GoToPosition", |
| spacecraft_config: Optional[SpacecraftConfig] = None, |
| task_config: Optional[Dict[str, Any]] = None, |
| max_episode_steps: int = 500, |
| initial_pos_range: float = 2.0, |
| initial_vel_range: float = 0.1, |
| ) -> None: |
| """ |
| Parameters |
| ---------- |
| task: |
| One of TASK_REGISTRY keys. Overridden by RANS_TASK env-var. |
| spacecraft_config: |
| Physical platform configuration. Uses 8-thruster MFP2D default. |
| task_config: |
| Dict of task hyper-parameters forwarded to the task constructor. |
| max_episode_steps: |
| Hard step limit per episode (overrides RANS_MAX_STEPS env-var). |
| initial_pos_range: |
| Half-width of the uniform distribution for random initial position. |
| initial_vel_range: |
| Half-width for random initial velocities. |
| """ |
| |
| task = os.environ.get("RANS_TASK", task) |
| max_episode_steps = int( |
| os.environ.get("RANS_MAX_STEPS", str(max_episode_steps)) |
| ) |
|
|
| if task not in TASK_REGISTRY: |
| raise ValueError( |
| f"Unknown task '{task}'. " |
| f"Available: {sorted(TASK_REGISTRY.keys())}" |
| ) |
|
|
| self._task_name = task |
| self._max_episode_steps = max_episode_steps |
| self._initial_pos_range = initial_pos_range |
| self._initial_vel_range = initial_vel_range |
|
|
| |
| if spacecraft_config is None: |
| n_env = os.environ.get("RANS_NUM_THRUSTERS") |
| if n_env is not None: |
| n = int(n_env) |
| spacecraft_config = SpacecraftConfig.from_num_thrusters(n) |
| else: |
| spacecraft_config = SpacecraftConfig.default_8_thruster() |
| self._spacecraft = Spacecraft2D(spacecraft_config) |
|
|
| |
| self._task = TASK_REGISTRY[task](task_config or {}) |
|
|
| |
| self._step_count: int = 0 |
| self._total_reward: float = 0.0 |
| self._ep_state = SpacecraftState(task=self._task_name) |
|
|
| |
| |
| |
|
|
| def reset(self) -> Observation: |
| """Start a new episode with a randomised initial spacecraft state.""" |
| init_state = self._sample_initial_state() |
| self._spacecraft.reset(init_state) |
|
|
| task_info = self._task.reset(self._spacecraft.state) |
|
|
| self._step_count = 0 |
| self._total_reward = 0.0 |
| self._ep_state = SpacecraftState( |
| episode_id=str(uuid.uuid4()), |
| step_count=0, |
| task=self._task_name, |
| **self._physical_state_dict(), |
| ) |
|
|
| return self._make_observation(reward=0.0, done=False, info=task_info) |
|
|
| def step(self, action: Action) -> Observation: |
| """ |
| Advance the simulation by one step. |
| |
| Control mode is selected automatically based on which fields are set: |
| |
| 1. **Thrusters** (``action.thrusters`` is not None): |
| List of per-thruster activations β [0, 1]. |
| 2. **Force/torque** (``action.fx``, ``action.fy``, or ``action.torque`` |
| is not None): |
| Direct world-frame force/torque β bypasses thruster geometry. |
| 3. **Velocity target** (``action.vx_target``, ``action.vy_target``, or |
| ``action.omega_target`` is not None): |
| PD controller drives the spacecraft toward the requested velocities. |
| """ |
| if not hasattr(action, "thrusters"): |
| raise ValueError( |
| f"Expected SpacecraftAction, received {type(action).__name__}." |
| ) |
|
|
| |
| if action.thrusters is not None: |
| activations = np.array(action.thrusters, dtype=np.float64) |
| n = self._spacecraft.n_thrusters |
| if len(activations) != n: |
| padded = np.zeros(n, dtype=np.float64) |
| padded[: min(len(activations), n)] = activations[:n] |
| activations = padded |
| self._spacecraft.step(activations) |
|
|
| |
| elif any(v is not None for v in [action.fx, action.fy, action.torque]): |
| self._spacecraft.step_force_torque( |
| fx_world=float(action.fx or 0.0), |
| fy_world=float(action.fy or 0.0), |
| torque=float(action.torque or 0.0), |
| ) |
|
|
| |
| elif any( |
| v is not None |
| for v in [action.vx_target, action.vy_target, action.omega_target] |
| ): |
| self._spacecraft.step_velocity_target( |
| vx_target=float(action.vx_target or 0.0), |
| vy_target=float(action.vy_target or 0.0), |
| omega_target=float(action.omega_target or 0.0), |
| ) |
|
|
| |
| else: |
| self._spacecraft.step(np.zeros(self._spacecraft.n_thrusters)) |
| self._step_count += 1 |
|
|
| |
| reward, goal_reached, info = self._task.compute_reward( |
| self._spacecraft.state |
| ) |
| self._total_reward += reward |
|
|
| |
| done = goal_reached or (self._step_count >= self._max_episode_steps) |
|
|
| |
| self._ep_state = SpacecraftState( |
| episode_id=self._ep_state.episode_id, |
| step_count=self._step_count, |
| task=self._task_name, |
| total_reward=self._total_reward, |
| goal_reached=goal_reached, |
| **self._physical_state_dict(), |
| ) |
|
|
| return self._make_observation(reward=reward, done=done, info=info) |
|
|
| @property |
| def state(self) -> SpacecraftState: |
| return self._ep_state |
|
|
| |
| |
| |
|
|
| def _sample_initial_state(self) -> np.ndarray: |
| """Uniform random initial state (small velocities, random pose).""" |
| r = self._initial_pos_range |
| v = self._initial_vel_range |
| return np.array( |
| [ |
| np.random.uniform(-r, r), |
| np.random.uniform(-r, r), |
| np.random.uniform(-math.pi, math.pi), |
| np.random.uniform(-v, v), |
| np.random.uniform(-v, v), |
| np.random.uniform(-v, v), |
| ], |
| dtype=np.float64, |
| ) |
|
|
| def _physical_state_dict(self) -> Dict[str, float]: |
| s = self._spacecraft.state |
| return { |
| "x": float(s[0]), |
| "y": float(s[1]), |
| "heading_rad": float(s[2]), |
| "vx": float(s[3]), |
| "vy": float(s[4]), |
| "angular_velocity_rads": float(s[5]), |
| } |
|
|
| def _make_observation( |
| self, reward: float, done: bool, info: Dict[str, Any] |
| ) -> SpacecraftObservation: |
| task_obs = self._task.get_observation(self._spacecraft.state) |
| return SpacecraftObservation( |
| state_obs=task_obs.tolist(), |
| thruster_transforms=self._spacecraft.get_thruster_transforms().tolist(), |
| thruster_masks=self._spacecraft.get_thruster_masks().tolist(), |
| mass=self._spacecraft.config.mass, |
| inertia=self._spacecraft.config.inertia, |
| task=self._task_name, |
| reward=float(reward), |
| done=bool(done), |
| info={**info, "step": self._step_count}, |
| ) |
|
|