ghostexec / server /ghostexec_environment.py
modelbuilderhq's picture
Upload folder using huggingface_hub
ee21104 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
GhostExec simulated world, agent step (Phases 2–3), and reward (Phase 4).
Scenario payloads load from scenarios/*.json. Observations are plain-text briefings.
Invalid actions return a structured error in observation metadata without raising.
Rewards aggregate conflict / relationship / task scores and log each step to outputs/logs/.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import (
Contact,
Email,
GhostexecAction,
GhostexecObservation,
Meeting,
Mood,
RewardBreakdown,
Task,
TaskStatus,
WorldState,
)
except ImportError:
from models import (
Contact,
Email,
GhostexecAction,
GhostexecObservation,
Meeting,
Mood,
RewardBreakdown,
Task,
TaskStatus,
WorldState,
)
try:
from . import reward as _reward
except ImportError:
try:
from server import reward as _reward
except ImportError:
import reward as _reward # type: ignore[no-redef]
_PRIORITY_RANK: dict[str, int] = {"critical": 0, "high": 1, "normal": 2, "low": 3}
_REL_DISPLAY: dict[str, str] = {
"board_member": "Board",
"spouse": "Spouse",
"investor": "Investor",
"direct_report": "Direct report",
"client": "Client",
"friend": "Friend",
"team_member": "Team",
}
_INVALID_ACTION_REWARD = -0.25
_DEFAULT_STEP_REWARD = 0.0
_MOOD_ORDER: tuple[Mood, ...] = ("furious", "angry", "annoyed", "neutral", "happy")
def _default_scenario_path() -> Path:
return Path(__file__).resolve().parent.parent / "scenarios" / "phase2_core.json"
def _parse_dt(value: str) -> datetime:
if value.endswith("Z"):
return datetime.fromisoformat(value[:-1]).replace(tzinfo=timezone.utc)
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
def _meeting_end(m: Meeting) -> datetime:
start = _parse_dt(m.start)
return start + timedelta(minutes=m.duration_minutes)
def _windows_overlap(a_start: datetime, a_end: datetime, b_start: datetime, b_end: datetime) -> bool:
return a_start < b_end and b_start < a_end
class GhostexecEnvironment(Environment):
"""Inbox, calendar, contacts, tasks, actions, briefings, and Phase 4 rewards."""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(
self,
scenario_path: str | Path | None = None,
schema_drift_events_path: str | Path | None = None,
reward_mode: str | None = None,
) -> None:
self._scenario_path = Path(scenario_path) if scenario_path else _default_scenario_path()
self._drift_events_path = (
Path(schema_drift_events_path) if schema_drift_events_path is not None else None
)
self._drift_events: list[dict[str, Any]] = []
if self._drift_events_path and self._drift_events_path.is_file():
drift_raw = json.loads(self._drift_events_path.read_text(encoding="utf-8"))
self._drift_events = list(drift_raw.get("events", []))
self._reply_relationship_suppressed: set[str] = set()
self._reward_log_path = (
Path(__file__).resolve().parent.parent / "outputs" / "logs" / "episode_rewards.jsonl"
)
self._world: WorldState | None = None
self._base_stress: int = 0
self._state = State(episode_id=str(uuid4()), step_count=0)
self._last_step_ok: bool = True
self._last_step_error: str | None = None
self._last_step_detail: str = ""
self._last_reward_breakdown: RewardBreakdown | None = None
self._reward_mode = (reward_mode or os.getenv("GHOSTEXEC_REWARD_MODE", "full")).strip().lower()
if self._reward_mode not in {"full", "base", "shaping"}:
self._reward_mode = "full"
# --- lifecycle ---
def reset(self) -> GhostexecObservation: # type: ignore[override]
self._world = self.load_world_from_json(self._scenario_path)
self._base_stress = self._world.stress
self._rebuild_conflict_list()
self._state = State(episode_id=str(uuid4()), step_count=0)
self._last_step_ok = True
self._last_step_error = None
self._last_step_detail = "Episode started."
self._reply_relationship_suppressed.clear()
self._last_reward_breakdown = None
self._ensure_reward_log_dir()
briefing = self.build_briefing_text()
return self._observation_from_briefing(
briefing,
reward=_DEFAULT_STEP_REWARD,
done=False,
reward_breakdown=None,
)
def step(self, action: GhostexecAction) -> GhostexecObservation: # type: ignore[override]
if self._world is None:
# OpenEnv HTTP uses a new env per request; prime the world so this step still
# runs the requested action (invalid actions get step_ok False, rewards apply).
self.reset()
assert self._world is not None
if not self._world.episode_active:
self._last_step_ok = False
self._last_step_error = "Episode is already finished."
bd = RewardBreakdown(
final=_INVALID_ACTION_REWARD,
invalid_step_adjustment=_INVALID_ACTION_REWARD,
)
self._last_reward_breakdown = bd
return self._observation_from_briefing(
self.build_briefing_text(),
reward=bd.final,
done=True,
reward_breakdown=bd,
)
self._state.step_count += 1
self._maybe_apply_schema_drift_events()
if action.message.strip():
self._world.action_log.append(f"note: {action.message.strip()}")
before = self.world.model_copy(deep=True)
action_ok = self._apply_action(action)
self._apply_post_action_dynamics(action, action_ok=action_ok)
self._rebuild_conflict_list()
episode_done = False
if self._state.step_count >= self._world.max_episode_steps:
episode_done = True
self._world.episode_active = False
self._world.episode_end_reason = self._world.episode_end_reason or "step_limit"
breakdown = _reward.compute_step_reward(
before,
self.world,
action,
action_ok=action_ok,
episode_done=episode_done,
relationship_suppressed_for_email_to=frozenset(self._reply_relationship_suppressed),
reward_mode=self._reward_mode,
step_index=self._state.step_count,
max_steps=self.world.max_episode_steps,
)
self._last_reward_breakdown = breakdown
self._append_reward_log(breakdown, episode_done, action)
briefing = self.build_briefing_text()
return self._observation_from_briefing(
briefing,
reward=breakdown.final,
done=episode_done,
reward_breakdown=breakdown,
)
@property
def state(self) -> State:
return self._state
@property
def world(self) -> WorldState:
if self._world is None:
raise RuntimeError("World not initialised; call reset() first.")
return self._world
# --- Phase 3 briefing (plain text for LLM) ---
def build_briefing_text(self) -> str:
w = self.world
now = _parse_dt(w.simulation_time)
header = now.strftime("=== GHOSTEXEC BRIEFING — %a %d %b %Y %H:%M ===")
unread = self.get_unread_emails_sorted()
email_lines = [
f"- [{e.priority.upper()}] From: {e.sender} ({_REL_DISPLAY.get(e.sender_relationship, e.sender_relationship)}) — "
f'"{e.subject}"\n Preview: {(e.body[:100] + ("…" if len(e.body) > 100 else "")).replace(chr(10), " ")}'
for e in unread[:20]
]
email_block = "\n".join(email_lines) if email_lines else "(none)"
horizon = now + timedelta(hours=4)
conflict_lines: list[str] = []
for row in self.detect_meeting_conflicts():
o0 = _parse_dt(row["overlap_start"])
o1 = _parse_dt(row["overlap_end"])
if o1 <= now or o0 >= horizon:
continue
ma = self._meeting_by_id(row["meeting_a"])
mb = self._meeting_by_id(row["meeting_b"])
if not ma or not mb or ma.cancelled or mb.cancelled:
continue
conflict_lines.append(
f"- {_fmt_meeting_line(ma)} CLASHES WITH -> {_fmt_meeting_line(mb)}"
)
conflict_block = "\n".join(conflict_lines) if conflict_lines else "(none in next 4 hours)"
top_contacts = sorted(w.contacts, key=lambda c: (-c.importance, c.name))[:5]
contact_lines = [
f"- {c.name}: {c.mood.upper()}{_REL_DISPLAY.get(c.relationship_type, c.relationship_type)}; "
f"prefers {c.communication_preference}"
for c in top_contacts
]
contact_block = "\n".join(contact_lines) if contact_lines else "(none)"
soon = now + timedelta(hours=24)
task_lines: list[str] = []
for t in w.tasks:
if t.status == "done":
continue
dl = _parse_dt(t.deadline)
if dl < now or (now <= dl <= soon):
flag = "OVERDUE" if dl < now else "due soon"
task_lines.append(f"- [{flag}] {t.description} (deadline {t.deadline}, owner {t.owner})")
task_block = "\n".join(task_lines[:15]) if task_lines else "(none)"
remaining = max(0, w.max_episode_steps - self._state.step_count)
parts = [
header,
"",
f"UNREAD EMAILS ({len(unread)} unread):",
email_block,
"",
"CALENDAR CONFLICTS IN NEXT 4 HOURS:",
conflict_block,
"",
"CONTACTS TO WATCH (top 5 by importance):",
contact_block,
"",
"OVERDUE OR DUE-SOON TASKS (next 24h window):",
task_block,
"",
f"EXEC STRESS LEVEL: {w.stress}/100",
f"STEPS REMAINING: {remaining}",
]
if self._last_step_error:
parts += ["", f"LAST ACTION: ERROR — {self._last_step_error}"]
elif self._last_step_detail:
parts += ["", f"LAST ACTION: OK — {self._last_step_detail}"]
return "\n".join(parts)
def _meeting_by_id(self, mid: str) -> Meeting | None:
for m in self.world.meetings:
if m.id == mid:
return m
return None
# --- scenario IO ---
@staticmethod
def load_world_from_json(path: str | Path) -> WorldState:
raw = Path(path).read_text(encoding="utf-8")
data = json.loads(raw)
return WorldState.model_validate(data)
@staticmethod
def world_to_json(world: WorldState) -> str:
return world.model_dump_json()
@staticmethod
def world_from_json(blob: str) -> WorldState:
return WorldState.model_validate_json(blob)
# --- inbox ---
def get_unread_emails_sorted(self) -> list[Email]:
w = self.world
unread = [e for e in w.emails if not e.read]
return sorted(
unread,
key=lambda e: (_PRIORITY_RANK.get(e.priority, 99), e.id),
)
def mark_email_read(self, email_id: str) -> bool:
for i, e in enumerate(self.world.emails):
if e.id == email_id:
self.world.emails[i] = e.model_copy(update={"read": True})
return True
return False
def mark_email_replied(self, email_id: str) -> bool:
for i, e in enumerate(self.world.emails):
if e.id == email_id:
self.world.emails[i] = e.model_copy(update={"read": True, "replied": True})
return True
return False
# --- calendar ---
def detect_meeting_conflicts(self) -> list[dict[str, Any]]:
active = [m for m in self.world.meetings if not m.cancelled]
out: list[dict[str, Any]] = []
for i, a in enumerate(active):
a_start = _parse_dt(a.start)
a_end = _meeting_end(a)
for b in active[i + 1 :]:
b_start = _parse_dt(b.start)
b_end = _meeting_end(b)
if _windows_overlap(a_start, a_end, b_start, b_end):
overlap_start = max(a_start, b_start)
overlap_end = min(a_end, b_end)
out.append(
{
"meeting_a": a.id,
"meeting_b": b.id,
"overlap_start": overlap_start.isoformat(),
"overlap_end": overlap_end.isoformat(),
}
)
return out
def _reschedule_causes_overlap(self, meeting_id: str, new_start_iso: str) -> bool:
idx = next((i for i, m in enumerate(self.world.meetings) if m.id == meeting_id), None)
if idx is None:
return True
cand = self.world.meetings[idx].model_copy(update={"start": new_start_iso})
c_start = _parse_dt(cand.start)
c_end = _meeting_end(cand)
for m in self.world.meetings:
if m.cancelled or m.id == meeting_id:
continue
if _windows_overlap(c_start, c_end, _parse_dt(m.start), _meeting_end(m)):
return True
return False
def reschedule_meeting(self, meeting_id: str, new_start_iso: str) -> bool:
for i, m in enumerate(self.world.meetings):
if m.id == meeting_id and not m.cancelled:
self.world.meetings[i] = m.model_copy(update={"start": new_start_iso})
self._rebuild_conflict_list()
return True
return False
def cancel_meeting(self, meeting_id: str) -> bool:
for i, m in enumerate(self.world.meetings):
if m.id == meeting_id:
self.world.meetings[i] = m.model_copy(update={"cancelled": True})
self._rebuild_conflict_list()
return True
return False
def add_meeting(self, meeting: Meeting) -> None:
self.world.meetings.append(meeting)
self._rebuild_conflict_list()
# --- contacts ---
def get_contact(self, name: str) -> Contact | None:
for c in self.world.contacts:
if c.name == name:
return c
return None
def update_contact_mood(self, name: str, mood: Mood) -> bool:
for i, c in enumerate(self.world.contacts):
if c.name == name:
self.world.contacts[i] = c.model_copy(update={"mood": mood})
return True
return False
# --- tasks ---
def update_task_status(self, task_id: str, status: TaskStatus) -> bool:
for i, t in enumerate(self.world.tasks):
if t.id == task_id:
self.world.tasks[i] = t.model_copy(update={"status": status})
return True
return False
def overdue_tasks_at(self, simulation_iso: str) -> list[Task]:
now = _parse_dt(simulation_iso)
out: list[Task] = []
for t in self.world.tasks:
if t.status in ("done",):
continue
if _parse_dt(t.deadline) < now:
out.append(t)
return out
def set_simulation_time(self, simulation_iso: str) -> None:
self.world.simulation_time = simulation_iso
self._reapply_task_overdue_flags()
self._rebuild_conflict_list()
# --- Phase 3 action execution ---
def _apply_action(self, action: GhostexecAction) -> bool:
self._last_step_ok = True
self._last_step_error = None
self._last_step_detail = ""
at = action.action_type
if at == "do_nothing":
self._last_step_detail = "No action taken."
return True
if at == "reply_email":
if not action.email_id:
return self._fail("reply_email requires email_id")
if not any(e.id == action.email_id for e in self.world.emails):
return self._fail(f"Unknown email_id {action.email_id!r}")
if not action.message_body.strip():
return self._fail("reply_email requires non-empty message_body")
self.mark_email_replied(action.email_id)
self._last_step_detail = f"Replied to email {action.email_id}."
return True
if at == "archive_email":
if not action.email_id:
return self._fail("archive_email requires email_id")
if not self.mark_email_read(action.email_id):
return self._fail(f"Unknown email_id {action.email_id!r}")
self._last_step_detail = f"Archived (read) email {action.email_id}."
return True
if at == "reschedule_meeting":
if not action.meeting_id or not action.new_time:
return self._fail("reschedule_meeting requires meeting_id and new_time")
if not any(m.id == action.meeting_id for m in self.world.meetings):
return self._fail(f"Unknown meeting_id {action.meeting_id!r}")
if self._reschedule_causes_overlap(action.meeting_id, action.new_time):
return self._fail("Target time overlaps another active meeting.")
if not self.reschedule_meeting(action.meeting_id, action.new_time):
return self._fail("Could not reschedule meeting.")
self._last_step_detail = f"Rescheduled {action.meeting_id} to {action.new_time}."
return True
if at == "cancel_meeting":
if not action.meeting_id:
return self._fail("cancel_meeting requires meeting_id")
if not any(m.id == action.meeting_id for m in self.world.meetings):
return self._fail(f"Unknown meeting_id {action.meeting_id!r}")
if not self.cancel_meeting(action.meeting_id):
return self._fail("Could not cancel meeting.")
reason = action.reason.strip() or "(no reason given)"
self._world.action_log.append(f"cancelled {action.meeting_id}: {reason}")
self._last_step_detail = f"Cancelled meeting {action.meeting_id}."
return True
if at == "complete_task":
if not action.task_id:
return self._fail("complete_task requires task_id")
t = next((x for x in self.world.tasks if x.id == action.task_id), None)
if not t:
return self._fail(f"Unknown task_id {action.task_id!r}")
if t.status == "done":
return self._fail("Task is already done.")
self.update_task_status(action.task_id, "done")
self._last_step_detail = f"Completed task {action.task_id}."
return True
if at == "delegate_task":
if not action.task_id or not action.contact_name.strip():
return self._fail("delegate_task requires task_id and contact_name")
if not any(t.id == action.task_id for t in self.world.tasks):
return self._fail(f"Unknown task_id {action.task_id!r}")
if not self.get_contact(action.contact_name.strip()):
return self._fail(f"Unknown contact {action.contact_name.strip()!r}")
for i, t in enumerate(self.world.tasks):
if t.id == action.task_id:
self.world.tasks[i] = t.model_copy(
update={
"delegated_to": action.contact_name.strip(),
"status": "in-progress",
}
)
break
self._last_step_detail = f"Delegated {action.task_id} to {action.contact_name.strip()}."
return True
if at == "send_message":
name = action.contact_name.strip()
if not name:
return self._fail("send_message requires contact_name")
if not self.get_contact(name):
return self._fail(f"Unknown contact {name!r}")
if not action.message_body.strip():
return self._fail("send_message requires non-empty message_body")
self._world.action_log.append(f"message to {name}: {action.message_body.strip()[:500]}")
self._last_step_detail = f"Message sent to {name}."
return True
return self._fail(f"Unsupported action_type {at!r}")
def _fail(self, msg: str) -> bool:
self._last_step_ok = False
self._last_step_error = msg
self._last_step_detail = ""
self._world.action_log.append(f"error: {msg}")
return False
def _advance_clock(self, minutes: int) -> None:
now = _parse_dt(self.world.simulation_time)
new_now = (now + timedelta(minutes=minutes)).replace(tzinfo=None)
self.world.simulation_time = new_now.isoformat(timespec="seconds")
self._reapply_task_overdue_flags()
def _shift_contact_mood(self, name: str, delta: int) -> None:
if delta == 0:
return
c = self.get_contact(name)
if c is None:
return
idx = _MOOD_ORDER.index(c.mood)
next_idx = max(0, min(len(_MOOD_ORDER) - 1, idx + delta))
if next_idx != idx:
self.update_contact_mood(name, _MOOD_ORDER[next_idx])
def _apply_post_action_dynamics(self, action: GhostexecAction, *, action_ok: bool) -> None:
# Step-level world progression adds realistic pressure dynamics while
# remaining deterministic and learnable for policy optimization.
self._advance_clock(minutes=20)
now = _parse_dt(self.world.simulation_time)
if action_ok and action.action_type == "reply_email" and action.email_id:
em = next((e for e in self.world.emails if e.id == action.email_id), None)
if em:
self._shift_contact_mood(em.sender, +1)
if action_ok and action.action_type == "send_message" and action.contact_name:
self._shift_contact_mood(action.contact_name.strip(), +1)
if action_ok and action.action_type == "cancel_meeting" and action.meeting_id:
mtg = next((m for m in self.world.meetings if m.id == action.meeting_id), None)
if mtg:
for attendee in mtg.attendees:
self._shift_contact_mood(attendee, -1)
# Pressure escalation only on idle/invalid behavior to keep
# action-quality separation sharp for learning.
if (not action_ok) or action.action_type == "do_nothing":
critical_pending = [e for e in self.world.emails if e.priority == "critical" and not e.replied]
if critical_pending:
self._shift_contact_mood(critical_pending[0].sender, -1)
# Meetings that have already ended without cancellation and still overlap
# indicate unresolved calendar debt; this increases stress pressure.
unresolved_past_conflicts = 0
for row in self.detect_meeting_conflicts():
overlap_end = _parse_dt(row["overlap_end"])
if overlap_end <= now:
unresolved_past_conflicts += 1
if unresolved_past_conflicts > 0:
self.world.action_log.append(
f"pressure: {unresolved_past_conflicts} unresolved past overlap(s) increased stress pressure."
)
def _ensure_reward_log_dir(self) -> None:
self._reward_log_path.parent.mkdir(parents=True, exist_ok=True)
def _append_reward_log(
self,
breakdown: RewardBreakdown,
episode_done: bool,
action: GhostexecAction,
) -> None:
self._ensure_reward_log_dir()
w = self.world
crit_open = sum(1 for e in w.emails if e.priority == "critical" and not e.replied)
overdue_n = len(self.overdue_tasks_at(w.simulation_time))
line = {
"episode_id": self._state.episode_id,
"step": self._state.step_count,
"action_type": action.action_type,
"step_ok": self._last_step_ok,
"reward": breakdown.final,
"conflict_raw": breakdown.conflict_raw,
"critical_queue_bonus": breakdown.critical_queue_bonus,
"conflict": breakdown.conflict,
"relationship": breakdown.relationship,
"task": breakdown.task,
"weighted_base": breakdown.weighted_base,
"output_scale": breakdown.output_scale,
"shaping_synergy": breakdown.shaping_synergy,
"shaping_tradeoff": breakdown.shaping_tradeoff,
"shaping_potential": breakdown.shaping_potential,
"shaping_scaffold": breakdown.shaping_scaffold,
"shaping_quality": breakdown.shaping_quality,
"shaping_total": breakdown.shaping_total,
"shaping_to_base_ratio": breakdown.shaping_to_base_ratio,
"invalid_step_adjustment": breakdown.invalid_step_adjustment,
"episode_completion_bonus": breakdown.episode_completion_bonus,
"catastrophic_penalty": breakdown.catastrophic_penalty,
"episode_done": episode_done,
"calendar_overlap_pairs": len(self.detect_meeting_conflicts()),
"critical_unreplied": crit_open,
"overdue_tasks": overdue_n,
"reward_mode": self._reward_mode,
}
with self._reward_log_path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(line) + "\n")
def _maybe_apply_schema_drift_events(self) -> None:
if not self._world or not self._drift_events:
return
step = self._state.step_count
for ev in self._drift_events:
if ev.get("after_step") != step:
continue
if "shift_all_meetings_hours" in ev:
delta = int(ev["shift_all_meetings_hours"])
for i, m in enumerate(self._world.meetings):
new_start = (_parse_dt(m.start) + timedelta(hours=delta)).replace(tzinfo=None)
self._world.meetings[i] = m.model_copy(
update={"start": new_start.isoformat(timespec="seconds")}
)
self._world.action_log.append(
f"schema drift: shifted all meeting starts by {delta:+d} hour(s) (calendar TZ policy)."
)
pref = ev.get("set_contact_preference")
if isinstance(pref, dict):
name = str(pref.get("name", ""))
comm = str(pref.get("communication_preference", "text"))
for i, c in enumerate(self._world.contacts):
if c.name == name:
self._world.contacts[i] = c.model_copy(
update={"communication_preference": comm} # type: ignore[arg-type]
)
break
self._world.action_log.append(
f"schema drift: contact {name!r} now prefers {comm} only (relationship channel change)."
)
td = ev.get("set_task_deadline")
if isinstance(td, dict):
tid = str(td.get("task_id", ""))
dl = str(td.get("deadline", ""))
for i, t in enumerate(self._world.tasks):
if t.id == tid:
self._world.tasks[i] = t.model_copy(update={"deadline": dl})
break
self._world.action_log.append(
f"schema drift: task {tid!r} deadline moved earlier to {dl!r}."
)
for name in ev.get("suppress_reply_relationship_for_senders", []) or []:
self._reply_relationship_suppressed.add(str(name))
self._world.action_log.append(
f"schema drift: replies to emails from {name!r} yield zero relationship score this episode."
)
scm = ev.get("set_contact_mood")
if isinstance(scm, dict):
cname = str(scm.get("name", ""))
mood_raw = str(scm.get("mood", "neutral"))
allowed: tuple[Mood, ...] = ("happy", "neutral", "annoyed", "angry", "furious")
if cname and mood_raw in allowed and self.update_contact_mood(cname, mood_raw):
self._world.action_log.append(
f"schema drift: stakeholder {cname!r} mood is now {mood_raw} (external pressure)."
)
if any(ev.get("after_step") == step for ev in self._drift_events):
self._rebuild_conflict_list()
# --- internals ---
def _reapply_task_overdue_flags(self) -> None:
now = _parse_dt(self.world.simulation_time)
for i, t in enumerate(self.world.tasks):
if t.status == "done":
continue
if _parse_dt(t.deadline) < now and t.status != "overdue":
self.world.tasks[i] = t.model_copy(update={"status": "overdue"})
def _rebuild_conflict_list(self) -> None:
lines: list[str] = []
for row in self.detect_meeting_conflicts():
lines.append(
f"Calendar overlap: {row['meeting_a']} vs {row['meeting_b']} "
f"({row['overlap_start']}{row['overlap_end']})"
)
for e in self.world.emails:
if e.priority == "critical" and not e.replied:
lines.append(f"Unanswered critical email {e.id}: {e.subject}")
bump = min(35, len(lines) * 2)
self.world.active_conflicts = lines
self.world.stress = min(100, self._base_stress + bump)
def _observation_from_briefing(
self,
briefing: str,
reward: float,
done: bool,
reward_breakdown: RewardBreakdown | None = None,
) -> GhostexecObservation:
w = self.world
unread_sorted = self.get_unread_emails_sorted()
meta: dict[str, Any] = {
"simulation_time": w.simulation_time,
"stress": w.stress,
"unread_email_count": sum(1 for e in w.emails if not e.read),
"calendar_conflict_pairs": len(self.detect_meeting_conflicts()),
"episode_step": self._state.step_count,
"max_episode_steps": w.max_episode_steps,
"episode_active": w.episode_active,
"episode_end_reason": w.episode_end_reason,
"step_ok": self._last_step_ok,
"step_error": self._last_step_error,
"step_detail": self._last_step_detail,
# Compact ids for remote trainers / Colab (briefing stays plain text).
"critical_unreplied_email_ids": [
e.id for e in w.emails if e.priority == "critical" and not e.replied
][:12],
"unread_email_ids": [e.id for e in unread_sorted[:15]],
"overdue_task_ids": [t.id for t in self.overdue_tasks_at(w.simulation_time)][:12],
"active_meeting_ids": [m.id for m in w.meetings if not m.cancelled][:20],
}
if reward_breakdown is not None:
meta["reward_breakdown"] = reward_breakdown.model_dump()
cap = 48_000
text = briefing if len(briefing) <= cap else briefing[: cap - 1] + "…"
return GhostexecObservation(
echoed_message=text,
message_length=len(text),
done=done,
reward=reward,
metadata=meta,
)
def _fmt_meeting_line(m: Meeting) -> str:
st = _parse_dt(m.start)
return f"{st.strftime('%H:%M')}: {m.title} ({m.duration_minutes}min)"