Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| GhostExec simulated world, agent step (Phases 2–3), and reward (Phase 4). | |
| Scenario payloads load from scenarios/*.json. Observations are plain-text briefings. | |
| Invalid actions return a structured error in observation metadata without raising. | |
| Rewards aggregate conflict / relationship / task scores and log each step to outputs/logs/. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ( | |
| Contact, | |
| Email, | |
| GhostexecAction, | |
| GhostexecObservation, | |
| Meeting, | |
| Mood, | |
| RewardBreakdown, | |
| Task, | |
| TaskStatus, | |
| WorldState, | |
| ) | |
| except ImportError: | |
| from models import ( | |
| Contact, | |
| Email, | |
| GhostexecAction, | |
| GhostexecObservation, | |
| Meeting, | |
| Mood, | |
| RewardBreakdown, | |
| Task, | |
| TaskStatus, | |
| WorldState, | |
| ) | |
| try: | |
| from . import reward as _reward | |
| except ImportError: | |
| try: | |
| from server import reward as _reward | |
| except ImportError: | |
| import reward as _reward # type: ignore[no-redef] | |
| _PRIORITY_RANK: dict[str, int] = {"critical": 0, "high": 1, "normal": 2, "low": 3} | |
| _REL_DISPLAY: dict[str, str] = { | |
| "board_member": "Board", | |
| "spouse": "Spouse", | |
| "investor": "Investor", | |
| "direct_report": "Direct report", | |
| "client": "Client", | |
| "friend": "Friend", | |
| "team_member": "Team", | |
| } | |
| _INVALID_ACTION_REWARD = -0.25 | |
| _DEFAULT_STEP_REWARD = 0.0 | |
| _MOOD_ORDER: tuple[Mood, ...] = ("furious", "angry", "annoyed", "neutral", "happy") | |
| def _default_scenario_path() -> Path: | |
| return Path(__file__).resolve().parent.parent / "scenarios" / "phase2_core.json" | |
| def _parse_dt(value: str) -> datetime: | |
| if value.endswith("Z"): | |
| return datetime.fromisoformat(value[:-1]).replace(tzinfo=timezone.utc) | |
| dt = datetime.fromisoformat(value) | |
| if dt.tzinfo is None: | |
| return dt.replace(tzinfo=timezone.utc) | |
| return dt | |
| def _meeting_end(m: Meeting) -> datetime: | |
| start = _parse_dt(m.start) | |
| return start + timedelta(minutes=m.duration_minutes) | |
| def _windows_overlap(a_start: datetime, a_end: datetime, b_start: datetime, b_end: datetime) -> bool: | |
| return a_start < b_end and b_start < a_end | |
| class GhostexecEnvironment(Environment): | |
| """Inbox, calendar, contacts, tasks, actions, briefings, and Phase 4 rewards.""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__( | |
| self, | |
| scenario_path: str | Path | None = None, | |
| schema_drift_events_path: str | Path | None = None, | |
| reward_mode: str | None = None, | |
| ) -> None: | |
| self._scenario_path = Path(scenario_path) if scenario_path else _default_scenario_path() | |
| self._drift_events_path = ( | |
| Path(schema_drift_events_path) if schema_drift_events_path is not None else None | |
| ) | |
| self._drift_events: list[dict[str, Any]] = [] | |
| if self._drift_events_path and self._drift_events_path.is_file(): | |
| drift_raw = json.loads(self._drift_events_path.read_text(encoding="utf-8")) | |
| self._drift_events = list(drift_raw.get("events", [])) | |
| self._reply_relationship_suppressed: set[str] = set() | |
| self._reward_log_path = ( | |
| Path(__file__).resolve().parent.parent / "outputs" / "logs" / "episode_rewards.jsonl" | |
| ) | |
| self._world: WorldState | None = None | |
| self._base_stress: int = 0 | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._last_step_ok: bool = True | |
| self._last_step_error: str | None = None | |
| self._last_step_detail: str = "" | |
| self._last_reward_breakdown: RewardBreakdown | None = None | |
| self._reward_mode = (reward_mode or os.getenv("GHOSTEXEC_REWARD_MODE", "full")).strip().lower() | |
| if self._reward_mode not in {"full", "base", "shaping"}: | |
| self._reward_mode = "full" | |
| # --- lifecycle --- | |
| def reset(self) -> GhostexecObservation: # type: ignore[override] | |
| self._world = self.load_world_from_json(self._scenario_path) | |
| self._base_stress = self._world.stress | |
| self._rebuild_conflict_list() | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._last_step_ok = True | |
| self._last_step_error = None | |
| self._last_step_detail = "Episode started." | |
| self._reply_relationship_suppressed.clear() | |
| self._last_reward_breakdown = None | |
| self._ensure_reward_log_dir() | |
| briefing = self.build_briefing_text() | |
| return self._observation_from_briefing( | |
| briefing, | |
| reward=_DEFAULT_STEP_REWARD, | |
| done=False, | |
| reward_breakdown=None, | |
| ) | |
| def step(self, action: GhostexecAction) -> GhostexecObservation: # type: ignore[override] | |
| if self._world is None: | |
| # OpenEnv HTTP uses a new env per request; prime the world so this step still | |
| # runs the requested action (invalid actions get step_ok False, rewards apply). | |
| self.reset() | |
| assert self._world is not None | |
| if not self._world.episode_active: | |
| self._last_step_ok = False | |
| self._last_step_error = "Episode is already finished." | |
| bd = RewardBreakdown( | |
| final=_INVALID_ACTION_REWARD, | |
| invalid_step_adjustment=_INVALID_ACTION_REWARD, | |
| ) | |
| self._last_reward_breakdown = bd | |
| return self._observation_from_briefing( | |
| self.build_briefing_text(), | |
| reward=bd.final, | |
| done=True, | |
| reward_breakdown=bd, | |
| ) | |
| self._state.step_count += 1 | |
| self._maybe_apply_schema_drift_events() | |
| if action.message.strip(): | |
| self._world.action_log.append(f"note: {action.message.strip()}") | |
| before = self.world.model_copy(deep=True) | |
| action_ok = self._apply_action(action) | |
| self._apply_post_action_dynamics(action, action_ok=action_ok) | |
| self._rebuild_conflict_list() | |
| episode_done = False | |
| if self._state.step_count >= self._world.max_episode_steps: | |
| episode_done = True | |
| self._world.episode_active = False | |
| self._world.episode_end_reason = self._world.episode_end_reason or "step_limit" | |
| breakdown = _reward.compute_step_reward( | |
| before, | |
| self.world, | |
| action, | |
| action_ok=action_ok, | |
| episode_done=episode_done, | |
| relationship_suppressed_for_email_to=frozenset(self._reply_relationship_suppressed), | |
| reward_mode=self._reward_mode, | |
| step_index=self._state.step_count, | |
| max_steps=self.world.max_episode_steps, | |
| ) | |
| self._last_reward_breakdown = breakdown | |
| self._append_reward_log(breakdown, episode_done, action) | |
| briefing = self.build_briefing_text() | |
| return self._observation_from_briefing( | |
| briefing, | |
| reward=breakdown.final, | |
| done=episode_done, | |
| reward_breakdown=breakdown, | |
| ) | |
| def state(self) -> State: | |
| return self._state | |
| def world(self) -> WorldState: | |
| if self._world is None: | |
| raise RuntimeError("World not initialised; call reset() first.") | |
| return self._world | |
| # --- Phase 3 briefing (plain text for LLM) --- | |
| def build_briefing_text(self) -> str: | |
| w = self.world | |
| now = _parse_dt(w.simulation_time) | |
| header = now.strftime("=== GHOSTEXEC BRIEFING — %a %d %b %Y %H:%M ===") | |
| unread = self.get_unread_emails_sorted() | |
| email_lines = [ | |
| f"- [{e.priority.upper()}] From: {e.sender} ({_REL_DISPLAY.get(e.sender_relationship, e.sender_relationship)}) — " | |
| f'"{e.subject}"\n Preview: {(e.body[:100] + ("…" if len(e.body) > 100 else "")).replace(chr(10), " ")}' | |
| for e in unread[:20] | |
| ] | |
| email_block = "\n".join(email_lines) if email_lines else "(none)" | |
| horizon = now + timedelta(hours=4) | |
| conflict_lines: list[str] = [] | |
| for row in self.detect_meeting_conflicts(): | |
| o0 = _parse_dt(row["overlap_start"]) | |
| o1 = _parse_dt(row["overlap_end"]) | |
| if o1 <= now or o0 >= horizon: | |
| continue | |
| ma = self._meeting_by_id(row["meeting_a"]) | |
| mb = self._meeting_by_id(row["meeting_b"]) | |
| if not ma or not mb or ma.cancelled or mb.cancelled: | |
| continue | |
| conflict_lines.append( | |
| f"- {_fmt_meeting_line(ma)} CLASHES WITH -> {_fmt_meeting_line(mb)}" | |
| ) | |
| conflict_block = "\n".join(conflict_lines) if conflict_lines else "(none in next 4 hours)" | |
| top_contacts = sorted(w.contacts, key=lambda c: (-c.importance, c.name))[:5] | |
| contact_lines = [ | |
| f"- {c.name}: {c.mood.upper()} — {_REL_DISPLAY.get(c.relationship_type, c.relationship_type)}; " | |
| f"prefers {c.communication_preference}" | |
| for c in top_contacts | |
| ] | |
| contact_block = "\n".join(contact_lines) if contact_lines else "(none)" | |
| soon = now + timedelta(hours=24) | |
| task_lines: list[str] = [] | |
| for t in w.tasks: | |
| if t.status == "done": | |
| continue | |
| dl = _parse_dt(t.deadline) | |
| if dl < now or (now <= dl <= soon): | |
| flag = "OVERDUE" if dl < now else "due soon" | |
| task_lines.append(f"- [{flag}] {t.description} (deadline {t.deadline}, owner {t.owner})") | |
| task_block = "\n".join(task_lines[:15]) if task_lines else "(none)" | |
| remaining = max(0, w.max_episode_steps - self._state.step_count) | |
| parts = [ | |
| header, | |
| "", | |
| f"UNREAD EMAILS ({len(unread)} unread):", | |
| email_block, | |
| "", | |
| "CALENDAR CONFLICTS IN NEXT 4 HOURS:", | |
| conflict_block, | |
| "", | |
| "CONTACTS TO WATCH (top 5 by importance):", | |
| contact_block, | |
| "", | |
| "OVERDUE OR DUE-SOON TASKS (next 24h window):", | |
| task_block, | |
| "", | |
| f"EXEC STRESS LEVEL: {w.stress}/100", | |
| f"STEPS REMAINING: {remaining}", | |
| ] | |
| if self._last_step_error: | |
| parts += ["", f"LAST ACTION: ERROR — {self._last_step_error}"] | |
| elif self._last_step_detail: | |
| parts += ["", f"LAST ACTION: OK — {self._last_step_detail}"] | |
| return "\n".join(parts) | |
| def _meeting_by_id(self, mid: str) -> Meeting | None: | |
| for m in self.world.meetings: | |
| if m.id == mid: | |
| return m | |
| return None | |
| # --- scenario IO --- | |
| def load_world_from_json(path: str | Path) -> WorldState: | |
| raw = Path(path).read_text(encoding="utf-8") | |
| data = json.loads(raw) | |
| return WorldState.model_validate(data) | |
| def world_to_json(world: WorldState) -> str: | |
| return world.model_dump_json() | |
| def world_from_json(blob: str) -> WorldState: | |
| return WorldState.model_validate_json(blob) | |
| # --- inbox --- | |
| def get_unread_emails_sorted(self) -> list[Email]: | |
| w = self.world | |
| unread = [e for e in w.emails if not e.read] | |
| return sorted( | |
| unread, | |
| key=lambda e: (_PRIORITY_RANK.get(e.priority, 99), e.id), | |
| ) | |
| def mark_email_read(self, email_id: str) -> bool: | |
| for i, e in enumerate(self.world.emails): | |
| if e.id == email_id: | |
| self.world.emails[i] = e.model_copy(update={"read": True}) | |
| return True | |
| return False | |
| def mark_email_replied(self, email_id: str) -> bool: | |
| for i, e in enumerate(self.world.emails): | |
| if e.id == email_id: | |
| self.world.emails[i] = e.model_copy(update={"read": True, "replied": True}) | |
| return True | |
| return False | |
| # --- calendar --- | |
| def detect_meeting_conflicts(self) -> list[dict[str, Any]]: | |
| active = [m for m in self.world.meetings if not m.cancelled] | |
| out: list[dict[str, Any]] = [] | |
| for i, a in enumerate(active): | |
| a_start = _parse_dt(a.start) | |
| a_end = _meeting_end(a) | |
| for b in active[i + 1 :]: | |
| b_start = _parse_dt(b.start) | |
| b_end = _meeting_end(b) | |
| if _windows_overlap(a_start, a_end, b_start, b_end): | |
| overlap_start = max(a_start, b_start) | |
| overlap_end = min(a_end, b_end) | |
| out.append( | |
| { | |
| "meeting_a": a.id, | |
| "meeting_b": b.id, | |
| "overlap_start": overlap_start.isoformat(), | |
| "overlap_end": overlap_end.isoformat(), | |
| } | |
| ) | |
| return out | |
| def _reschedule_causes_overlap(self, meeting_id: str, new_start_iso: str) -> bool: | |
| idx = next((i for i, m in enumerate(self.world.meetings) if m.id == meeting_id), None) | |
| if idx is None: | |
| return True | |
| cand = self.world.meetings[idx].model_copy(update={"start": new_start_iso}) | |
| c_start = _parse_dt(cand.start) | |
| c_end = _meeting_end(cand) | |
| for m in self.world.meetings: | |
| if m.cancelled or m.id == meeting_id: | |
| continue | |
| if _windows_overlap(c_start, c_end, _parse_dt(m.start), _meeting_end(m)): | |
| return True | |
| return False | |
| def reschedule_meeting(self, meeting_id: str, new_start_iso: str) -> bool: | |
| for i, m in enumerate(self.world.meetings): | |
| if m.id == meeting_id and not m.cancelled: | |
| self.world.meetings[i] = m.model_copy(update={"start": new_start_iso}) | |
| self._rebuild_conflict_list() | |
| return True | |
| return False | |
| def cancel_meeting(self, meeting_id: str) -> bool: | |
| for i, m in enumerate(self.world.meetings): | |
| if m.id == meeting_id: | |
| self.world.meetings[i] = m.model_copy(update={"cancelled": True}) | |
| self._rebuild_conflict_list() | |
| return True | |
| return False | |
| def add_meeting(self, meeting: Meeting) -> None: | |
| self.world.meetings.append(meeting) | |
| self._rebuild_conflict_list() | |
| # --- contacts --- | |
| def get_contact(self, name: str) -> Contact | None: | |
| for c in self.world.contacts: | |
| if c.name == name: | |
| return c | |
| return None | |
| def update_contact_mood(self, name: str, mood: Mood) -> bool: | |
| for i, c in enumerate(self.world.contacts): | |
| if c.name == name: | |
| self.world.contacts[i] = c.model_copy(update={"mood": mood}) | |
| return True | |
| return False | |
| # --- tasks --- | |
| def update_task_status(self, task_id: str, status: TaskStatus) -> bool: | |
| for i, t in enumerate(self.world.tasks): | |
| if t.id == task_id: | |
| self.world.tasks[i] = t.model_copy(update={"status": status}) | |
| return True | |
| return False | |
| def overdue_tasks_at(self, simulation_iso: str) -> list[Task]: | |
| now = _parse_dt(simulation_iso) | |
| out: list[Task] = [] | |
| for t in self.world.tasks: | |
| if t.status in ("done",): | |
| continue | |
| if _parse_dt(t.deadline) < now: | |
| out.append(t) | |
| return out | |
| def set_simulation_time(self, simulation_iso: str) -> None: | |
| self.world.simulation_time = simulation_iso | |
| self._reapply_task_overdue_flags() | |
| self._rebuild_conflict_list() | |
| # --- Phase 3 action execution --- | |
| def _apply_action(self, action: GhostexecAction) -> bool: | |
| self._last_step_ok = True | |
| self._last_step_error = None | |
| self._last_step_detail = "" | |
| at = action.action_type | |
| if at == "do_nothing": | |
| self._last_step_detail = "No action taken." | |
| return True | |
| if at == "reply_email": | |
| if not action.email_id: | |
| return self._fail("reply_email requires email_id") | |
| if not any(e.id == action.email_id for e in self.world.emails): | |
| return self._fail(f"Unknown email_id {action.email_id!r}") | |
| if not action.message_body.strip(): | |
| return self._fail("reply_email requires non-empty message_body") | |
| self.mark_email_replied(action.email_id) | |
| self._last_step_detail = f"Replied to email {action.email_id}." | |
| return True | |
| if at == "archive_email": | |
| if not action.email_id: | |
| return self._fail("archive_email requires email_id") | |
| if not self.mark_email_read(action.email_id): | |
| return self._fail(f"Unknown email_id {action.email_id!r}") | |
| self._last_step_detail = f"Archived (read) email {action.email_id}." | |
| return True | |
| if at == "reschedule_meeting": | |
| if not action.meeting_id or not action.new_time: | |
| return self._fail("reschedule_meeting requires meeting_id and new_time") | |
| if not any(m.id == action.meeting_id for m in self.world.meetings): | |
| return self._fail(f"Unknown meeting_id {action.meeting_id!r}") | |
| if self._reschedule_causes_overlap(action.meeting_id, action.new_time): | |
| return self._fail("Target time overlaps another active meeting.") | |
| if not self.reschedule_meeting(action.meeting_id, action.new_time): | |
| return self._fail("Could not reschedule meeting.") | |
| self._last_step_detail = f"Rescheduled {action.meeting_id} to {action.new_time}." | |
| return True | |
| if at == "cancel_meeting": | |
| if not action.meeting_id: | |
| return self._fail("cancel_meeting requires meeting_id") | |
| if not any(m.id == action.meeting_id for m in self.world.meetings): | |
| return self._fail(f"Unknown meeting_id {action.meeting_id!r}") | |
| if not self.cancel_meeting(action.meeting_id): | |
| return self._fail("Could not cancel meeting.") | |
| reason = action.reason.strip() or "(no reason given)" | |
| self._world.action_log.append(f"cancelled {action.meeting_id}: {reason}") | |
| self._last_step_detail = f"Cancelled meeting {action.meeting_id}." | |
| return True | |
| if at == "complete_task": | |
| if not action.task_id: | |
| return self._fail("complete_task requires task_id") | |
| t = next((x for x in self.world.tasks if x.id == action.task_id), None) | |
| if not t: | |
| return self._fail(f"Unknown task_id {action.task_id!r}") | |
| if t.status == "done": | |
| return self._fail("Task is already done.") | |
| self.update_task_status(action.task_id, "done") | |
| self._last_step_detail = f"Completed task {action.task_id}." | |
| return True | |
| if at == "delegate_task": | |
| if not action.task_id or not action.contact_name.strip(): | |
| return self._fail("delegate_task requires task_id and contact_name") | |
| if not any(t.id == action.task_id for t in self.world.tasks): | |
| return self._fail(f"Unknown task_id {action.task_id!r}") | |
| if not self.get_contact(action.contact_name.strip()): | |
| return self._fail(f"Unknown contact {action.contact_name.strip()!r}") | |
| for i, t in enumerate(self.world.tasks): | |
| if t.id == action.task_id: | |
| self.world.tasks[i] = t.model_copy( | |
| update={ | |
| "delegated_to": action.contact_name.strip(), | |
| "status": "in-progress", | |
| } | |
| ) | |
| break | |
| self._last_step_detail = f"Delegated {action.task_id} to {action.contact_name.strip()}." | |
| return True | |
| if at == "send_message": | |
| name = action.contact_name.strip() | |
| if not name: | |
| return self._fail("send_message requires contact_name") | |
| if not self.get_contact(name): | |
| return self._fail(f"Unknown contact {name!r}") | |
| if not action.message_body.strip(): | |
| return self._fail("send_message requires non-empty message_body") | |
| self._world.action_log.append(f"message to {name}: {action.message_body.strip()[:500]}") | |
| self._last_step_detail = f"Message sent to {name}." | |
| return True | |
| return self._fail(f"Unsupported action_type {at!r}") | |
| def _fail(self, msg: str) -> bool: | |
| self._last_step_ok = False | |
| self._last_step_error = msg | |
| self._last_step_detail = "" | |
| self._world.action_log.append(f"error: {msg}") | |
| return False | |
| def _advance_clock(self, minutes: int) -> None: | |
| now = _parse_dt(self.world.simulation_time) | |
| new_now = (now + timedelta(minutes=minutes)).replace(tzinfo=None) | |
| self.world.simulation_time = new_now.isoformat(timespec="seconds") | |
| self._reapply_task_overdue_flags() | |
| def _shift_contact_mood(self, name: str, delta: int) -> None: | |
| if delta == 0: | |
| return | |
| c = self.get_contact(name) | |
| if c is None: | |
| return | |
| idx = _MOOD_ORDER.index(c.mood) | |
| next_idx = max(0, min(len(_MOOD_ORDER) - 1, idx + delta)) | |
| if next_idx != idx: | |
| self.update_contact_mood(name, _MOOD_ORDER[next_idx]) | |
| def _apply_post_action_dynamics(self, action: GhostexecAction, *, action_ok: bool) -> None: | |
| # Step-level world progression adds realistic pressure dynamics while | |
| # remaining deterministic and learnable for policy optimization. | |
| self._advance_clock(minutes=20) | |
| now = _parse_dt(self.world.simulation_time) | |
| if action_ok and action.action_type == "reply_email" and action.email_id: | |
| em = next((e for e in self.world.emails if e.id == action.email_id), None) | |
| if em: | |
| self._shift_contact_mood(em.sender, +1) | |
| if action_ok and action.action_type == "send_message" and action.contact_name: | |
| self._shift_contact_mood(action.contact_name.strip(), +1) | |
| if action_ok and action.action_type == "cancel_meeting" and action.meeting_id: | |
| mtg = next((m for m in self.world.meetings if m.id == action.meeting_id), None) | |
| if mtg: | |
| for attendee in mtg.attendees: | |
| self._shift_contact_mood(attendee, -1) | |
| # Pressure escalation only on idle/invalid behavior to keep | |
| # action-quality separation sharp for learning. | |
| if (not action_ok) or action.action_type == "do_nothing": | |
| critical_pending = [e for e in self.world.emails if e.priority == "critical" and not e.replied] | |
| if critical_pending: | |
| self._shift_contact_mood(critical_pending[0].sender, -1) | |
| # Meetings that have already ended without cancellation and still overlap | |
| # indicate unresolved calendar debt; this increases stress pressure. | |
| unresolved_past_conflicts = 0 | |
| for row in self.detect_meeting_conflicts(): | |
| overlap_end = _parse_dt(row["overlap_end"]) | |
| if overlap_end <= now: | |
| unresolved_past_conflicts += 1 | |
| if unresolved_past_conflicts > 0: | |
| self.world.action_log.append( | |
| f"pressure: {unresolved_past_conflicts} unresolved past overlap(s) increased stress pressure." | |
| ) | |
| def _ensure_reward_log_dir(self) -> None: | |
| self._reward_log_path.parent.mkdir(parents=True, exist_ok=True) | |
| def _append_reward_log( | |
| self, | |
| breakdown: RewardBreakdown, | |
| episode_done: bool, | |
| action: GhostexecAction, | |
| ) -> None: | |
| self._ensure_reward_log_dir() | |
| w = self.world | |
| crit_open = sum(1 for e in w.emails if e.priority == "critical" and not e.replied) | |
| overdue_n = len(self.overdue_tasks_at(w.simulation_time)) | |
| line = { | |
| "episode_id": self._state.episode_id, | |
| "step": self._state.step_count, | |
| "action_type": action.action_type, | |
| "step_ok": self._last_step_ok, | |
| "reward": breakdown.final, | |
| "conflict_raw": breakdown.conflict_raw, | |
| "critical_queue_bonus": breakdown.critical_queue_bonus, | |
| "conflict": breakdown.conflict, | |
| "relationship": breakdown.relationship, | |
| "task": breakdown.task, | |
| "weighted_base": breakdown.weighted_base, | |
| "output_scale": breakdown.output_scale, | |
| "shaping_synergy": breakdown.shaping_synergy, | |
| "shaping_tradeoff": breakdown.shaping_tradeoff, | |
| "shaping_potential": breakdown.shaping_potential, | |
| "shaping_scaffold": breakdown.shaping_scaffold, | |
| "shaping_quality": breakdown.shaping_quality, | |
| "shaping_total": breakdown.shaping_total, | |
| "shaping_to_base_ratio": breakdown.shaping_to_base_ratio, | |
| "invalid_step_adjustment": breakdown.invalid_step_adjustment, | |
| "episode_completion_bonus": breakdown.episode_completion_bonus, | |
| "catastrophic_penalty": breakdown.catastrophic_penalty, | |
| "episode_done": episode_done, | |
| "calendar_overlap_pairs": len(self.detect_meeting_conflicts()), | |
| "critical_unreplied": crit_open, | |
| "overdue_tasks": overdue_n, | |
| "reward_mode": self._reward_mode, | |
| } | |
| with self._reward_log_path.open("a", encoding="utf-8") as fh: | |
| fh.write(json.dumps(line) + "\n") | |
| def _maybe_apply_schema_drift_events(self) -> None: | |
| if not self._world or not self._drift_events: | |
| return | |
| step = self._state.step_count | |
| for ev in self._drift_events: | |
| if ev.get("after_step") != step: | |
| continue | |
| if "shift_all_meetings_hours" in ev: | |
| delta = int(ev["shift_all_meetings_hours"]) | |
| for i, m in enumerate(self._world.meetings): | |
| new_start = (_parse_dt(m.start) + timedelta(hours=delta)).replace(tzinfo=None) | |
| self._world.meetings[i] = m.model_copy( | |
| update={"start": new_start.isoformat(timespec="seconds")} | |
| ) | |
| self._world.action_log.append( | |
| f"schema drift: shifted all meeting starts by {delta:+d} hour(s) (calendar TZ policy)." | |
| ) | |
| pref = ev.get("set_contact_preference") | |
| if isinstance(pref, dict): | |
| name = str(pref.get("name", "")) | |
| comm = str(pref.get("communication_preference", "text")) | |
| for i, c in enumerate(self._world.contacts): | |
| if c.name == name: | |
| self._world.contacts[i] = c.model_copy( | |
| update={"communication_preference": comm} # type: ignore[arg-type] | |
| ) | |
| break | |
| self._world.action_log.append( | |
| f"schema drift: contact {name!r} now prefers {comm} only (relationship channel change)." | |
| ) | |
| td = ev.get("set_task_deadline") | |
| if isinstance(td, dict): | |
| tid = str(td.get("task_id", "")) | |
| dl = str(td.get("deadline", "")) | |
| for i, t in enumerate(self._world.tasks): | |
| if t.id == tid: | |
| self._world.tasks[i] = t.model_copy(update={"deadline": dl}) | |
| break | |
| self._world.action_log.append( | |
| f"schema drift: task {tid!r} deadline moved earlier to {dl!r}." | |
| ) | |
| for name in ev.get("suppress_reply_relationship_for_senders", []) or []: | |
| self._reply_relationship_suppressed.add(str(name)) | |
| self._world.action_log.append( | |
| f"schema drift: replies to emails from {name!r} yield zero relationship score this episode." | |
| ) | |
| scm = ev.get("set_contact_mood") | |
| if isinstance(scm, dict): | |
| cname = str(scm.get("name", "")) | |
| mood_raw = str(scm.get("mood", "neutral")) | |
| allowed: tuple[Mood, ...] = ("happy", "neutral", "annoyed", "angry", "furious") | |
| if cname and mood_raw in allowed and self.update_contact_mood(cname, mood_raw): | |
| self._world.action_log.append( | |
| f"schema drift: stakeholder {cname!r} mood is now {mood_raw} (external pressure)." | |
| ) | |
| if any(ev.get("after_step") == step for ev in self._drift_events): | |
| self._rebuild_conflict_list() | |
| # --- internals --- | |
| def _reapply_task_overdue_flags(self) -> None: | |
| now = _parse_dt(self.world.simulation_time) | |
| for i, t in enumerate(self.world.tasks): | |
| if t.status == "done": | |
| continue | |
| if _parse_dt(t.deadline) < now and t.status != "overdue": | |
| self.world.tasks[i] = t.model_copy(update={"status": "overdue"}) | |
| def _rebuild_conflict_list(self) -> None: | |
| lines: list[str] = [] | |
| for row in self.detect_meeting_conflicts(): | |
| lines.append( | |
| f"Calendar overlap: {row['meeting_a']} vs {row['meeting_b']} " | |
| f"({row['overlap_start']} – {row['overlap_end']})" | |
| ) | |
| for e in self.world.emails: | |
| if e.priority == "critical" and not e.replied: | |
| lines.append(f"Unanswered critical email {e.id}: {e.subject}") | |
| bump = min(35, len(lines) * 2) | |
| self.world.active_conflicts = lines | |
| self.world.stress = min(100, self._base_stress + bump) | |
| def _observation_from_briefing( | |
| self, | |
| briefing: str, | |
| reward: float, | |
| done: bool, | |
| reward_breakdown: RewardBreakdown | None = None, | |
| ) -> GhostexecObservation: | |
| w = self.world | |
| unread_sorted = self.get_unread_emails_sorted() | |
| meta: dict[str, Any] = { | |
| "simulation_time": w.simulation_time, | |
| "stress": w.stress, | |
| "unread_email_count": sum(1 for e in w.emails if not e.read), | |
| "calendar_conflict_pairs": len(self.detect_meeting_conflicts()), | |
| "episode_step": self._state.step_count, | |
| "max_episode_steps": w.max_episode_steps, | |
| "episode_active": w.episode_active, | |
| "episode_end_reason": w.episode_end_reason, | |
| "step_ok": self._last_step_ok, | |
| "step_error": self._last_step_error, | |
| "step_detail": self._last_step_detail, | |
| # Compact ids for remote trainers / Colab (briefing stays plain text). | |
| "critical_unreplied_email_ids": [ | |
| e.id for e in w.emails if e.priority == "critical" and not e.replied | |
| ][:12], | |
| "unread_email_ids": [e.id for e in unread_sorted[:15]], | |
| "overdue_task_ids": [t.id for t in self.overdue_tasks_at(w.simulation_time)][:12], | |
| "active_meeting_ids": [m.id for m in w.meetings if not m.cancelled][:20], | |
| } | |
| if reward_breakdown is not None: | |
| meta["reward_breakdown"] = reward_breakdown.model_dump() | |
| cap = 48_000 | |
| text = briefing if len(briefing) <= cap else briefing[: cap - 1] + "…" | |
| return GhostexecObservation( | |
| echoed_message=text, | |
| message_length=len(text), | |
| done=done, | |
| reward=reward, | |
| metadata=meta, | |
| ) | |
| def _fmt_meeting_line(m: Meeting) -> str: | |
| st = _parse_dt(m.start) | |
| return f"{st.strftime('%H:%M')}: {m.title} ({m.duration_minutes}min)" | |