Spaces:
Sleeping
Sleeping
| import asyncio | |
| from typing import Optional, Any | |
| from openenv.core.env_server import Environment | |
| from models import Action, Observation, StepResult, State | |
| from env import DevOpsIncidentEnv as DevOpsIncidentLogic | |
| from generator.incident_factory import IncidentFactory | |
| from tasks.task_generated import GeneratedTask | |
| _factory = IncidentFactory() | |
| class DevOpsEnvironment(Environment[Action, Observation, State]): | |
| """Server-side environment wrapper using openenv-core.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self): | |
| super().__init__() | |
| self._logic: Optional[DevOpsIncidentLogic] = None | |
| async def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: str = "easy", | |
| **kwargs: Any | |
| ) -> Observation: | |
| if task_id == "generated": | |
| actual_seed = seed if seed is not None else 42 | |
| incident = _factory.generate(actual_seed) | |
| task = GeneratedTask(incident_dict=incident) | |
| state = task.initialize() | |
| # Create the logic instance with 'easy' to bypass validation | |
| self._logic = DevOpsIncidentLogic(task_id="easy", seed=actual_seed) | |
| self._logic.task_id = "generated" | |
| self._logic._task = task | |
| self._logic._internal_state = state | |
| return state._build_observation() | |
| else: | |
| self._logic = DevOpsIncidentLogic(task_id=task_id, seed=seed) | |
| return self._logic.reset() | |
| async def step( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any | |
| ) -> StepResult: | |
| if self._logic is None: | |
| raise RuntimeError("Environment not reset") | |
| return self._logic.step(action) | |
| def state(self) -> State: | |
| if self._logic is None: | |
| raise RuntimeError("Environment not reset") | |
| return self._logic.state() | |