|
|
""" |
|
|
Fire-Rescue MCP - Simulation Service |
|
|
|
|
|
Background service that manages simulation loop and LLM advisor evaluations. |
|
|
Designed for integration with Gradio and HTTP API endpoints. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import concurrent.futures |
|
|
import html |
|
|
import json |
|
|
import os |
|
|
import threading |
|
|
import time |
|
|
from dataclasses import dataclass, field |
|
|
from datetime import datetime |
|
|
from typing import Any, Callable, Optional |
|
|
|
|
|
from agent import ( |
|
|
AdvisorAgent, |
|
|
AdvisorResponse, |
|
|
AfterActionReport, |
|
|
AssessmentResult, |
|
|
PlanResult, |
|
|
CycleSummary, |
|
|
) |
|
|
from config import range_text |
|
|
from fire_rescue_mcp.mcp_client import LocalFastMCPClient |
|
|
from fire_rescue_mcp.mcp_server import attach_engine, detach_engine, mcp as fastmcp_server |
|
|
from models import SimulationStatus, CellType |
|
|
from simulation import SimulationEngine |
|
|
|
|
|
FIRE_COUNT_RANGE_TEXT = range_text("fire_count") |
|
|
BUILDING_COUNT_RANGE_TEXT = range_text("building_count") |
|
|
|
|
|
|
|
|
ADVISOR_MODEL_CHOICES = { |
|
|
"GPT-OSS Β· HuggingFace (openai/gpt-oss-120b)": { |
|
|
"provider": "hf", |
|
|
"model": "openai/gpt-oss-120b", |
|
|
"description": "Default OSS advisor routed through HuggingFace Inference", |
|
|
}, |
|
|
"GPT-OSS-20B Β· HuggingFace (openai/gpt-oss-20b)": { |
|
|
"provider": "hf", |
|
|
"model": "openai/gpt-oss-20b", |
|
|
"description": "OpenAI GPT-OSS 20B model via HuggingFace Inference", |
|
|
}, |
|
|
"Llama-3.1 Β· HuggingFace (meta-llama/Llama-3.1-8B-Instruct)": { |
|
|
"provider": "hf", |
|
|
"model": "meta-llama/Llama-3.1-8B-Instruct", |
|
|
"description": "Meta Llama-3.1 8B Instruct model via HuggingFace Inference", |
|
|
}, |
|
|
"OpenAI Β· gpt-5.1": { |
|
|
"provider": "openai", |
|
|
"model": "gpt-5.1", |
|
|
"description": "Flagship GPT-5.1 via native OpenAI API", |
|
|
}, |
|
|
} |
|
|
|
|
|
DEFAULT_ADVISOR_MODEL_CHOICE = "GPT-OSS Β· HuggingFace (openai/gpt-oss-120b)" |
|
|
|
|
|
|
|
|
def generate_emoji_map(engine: SimulationEngine) -> str: |
|
|
""" |
|
|
Generate an emoji-based visualization of the current world state. |
|
|
Matches Gradio UI: π²Forest π’Building π₯Fire π¨Smoke πTruck πHeli |
|
|
""" |
|
|
if engine.world is None: |
|
|
return "No map available" |
|
|
|
|
|
world = engine.world |
|
|
|
|
|
|
|
|
unit_positions = {} |
|
|
for unit in world.units: |
|
|
key = (unit.x, unit.y) |
|
|
if key not in unit_positions: |
|
|
unit_positions[key] = [] |
|
|
unit_positions[key].append(unit.unit_type.value) |
|
|
|
|
|
|
|
|
lines = [] |
|
|
|
|
|
|
|
|
header = " " + "".join(f"{x:2}" for x in range(world.width)) |
|
|
lines.append(header) |
|
|
|
|
|
for y in range(world.height): |
|
|
row_chars = [] |
|
|
for x in range(world.width): |
|
|
cell = world.grid[y][x] |
|
|
pos = (x, y) |
|
|
|
|
|
|
|
|
if pos in unit_positions: |
|
|
if "fire_truck" in unit_positions[pos]: |
|
|
row_chars.append("π") |
|
|
else: |
|
|
row_chars.append("π") |
|
|
elif cell.fire_intensity > 0: |
|
|
if cell.fire_intensity >= 0.1: |
|
|
row_chars.append("π₯") |
|
|
else: |
|
|
row_chars.append("π¨") |
|
|
else: |
|
|
if cell.cell_type == CellType.BUILDING: |
|
|
row_chars.append("π’") |
|
|
elif cell.cell_type == CellType.FOREST: |
|
|
row_chars.append("π²") |
|
|
else: |
|
|
row_chars.append("β¬") |
|
|
|
|
|
lines.append(f"{y:2} " + "".join(row_chars)) |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LogEntry: |
|
|
"""A single log entry for the simulation.""" |
|
|
timestamp: str |
|
|
tick: int |
|
|
event_type: str |
|
|
message: str |
|
|
details: Optional[dict] = None |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
return { |
|
|
"timestamp": self.timestamp, |
|
|
"tick": self.tick, |
|
|
"event_type": self.event_type, |
|
|
"message": self.message, |
|
|
"details": self.details |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SimulationService: |
|
|
""" |
|
|
Service that manages the simulation lifecycle and LLM advisor. |
|
|
|
|
|
Provides: |
|
|
- Background simulation loop |
|
|
- Periodic LLM advisor evaluations |
|
|
- Thread-safe state access |
|
|
- Event logging |
|
|
""" |
|
|
|
|
|
|
|
|
tick_interval: float = 1.0 |
|
|
advisor_interval: int = 10 |
|
|
|
|
|
|
|
|
engine: SimulationEngine = field(default_factory=SimulationEngine) |
|
|
advisor: AdvisorAgent = field(default_factory=AdvisorAgent) |
|
|
|
|
|
|
|
|
_running: bool = False |
|
|
_thread: Optional[threading.Thread] = None |
|
|
_lock: threading.RLock = field(default_factory=threading.RLock) |
|
|
|
|
|
|
|
|
_logs: list[LogEntry] = field(default_factory=list) |
|
|
_latest_recommendations: Optional[AdvisorResponse] = None |
|
|
_on_update: Optional[Callable] = None |
|
|
|
|
|
|
|
|
_is_thinking: bool = False |
|
|
_thinking_start_tick: int = 0 |
|
|
_current_stage: int = 0 |
|
|
_current_cycle_messages: list = field(default_factory=list) |
|
|
|
|
|
|
|
|
_advisor_history: list[dict] = field(default_factory=list) |
|
|
_cycle_summaries: list[dict] = field(default_factory=list) |
|
|
_metrics_history: list[dict] = field(default_factory=list) |
|
|
_threat_history: list[dict] = field(default_factory=list) |
|
|
_action_history: list[dict] = field(default_factory=list) |
|
|
_player_actions: list[dict] = field(default_factory=list) |
|
|
|
|
|
|
|
|
_mcp_client: LocalFastMCPClient | None = None |
|
|
_mcp_call_log: list[dict] = field(default_factory=list) |
|
|
_mcp_log_dirty: bool = False |
|
|
_last_mcp_log: str = "" |
|
|
|
|
|
|
|
|
_advisor_running: bool = False |
|
|
advisor_timeout: float = 30.0 |
|
|
advisor_max_retries: int = 3 |
|
|
|
|
|
|
|
|
_result_shown: bool = False |
|
|
_result_dismissed: bool = False |
|
|
_result_report: Optional[AfterActionReport] = None |
|
|
_result_report_status: str = "idle" |
|
|
_result_report_error: str = "" |
|
|
_last_result_signature: str = "" |
|
|
_last_result_payload_signature: str = "" |
|
|
|
|
|
|
|
|
_auto_execute: bool = True |
|
|
_executed_recommendations: set = field(default_factory=set) |
|
|
_session_owner_id: str = "" |
|
|
_attached_session_id: str = "" |
|
|
_model_choice: str = DEFAULT_ADVISOR_MODEL_CHOICE |
|
|
|
|
|
|
|
|
_tick_count: int = 0 |
|
|
_advisor_first_run: bool = True |
|
|
|
|
|
|
|
|
_last_grid_hash: str = "" |
|
|
_last_advisor_signature: tuple = field(default_factory=tuple) |
|
|
_last_history_signature: tuple = field(default_factory=tuple) |
|
|
_last_event_log: str = "" |
|
|
_last_button_states: tuple = () |
|
|
_last_result_state: str = "" |
|
|
|
|
|
def __post_init__(self): |
|
|
self._lock = threading.RLock() |
|
|
self._logs = [] |
|
|
self._result_shown = False |
|
|
self._result_dismissed = False |
|
|
self._reset_after_action_report_locked() |
|
|
self._is_thinking = False |
|
|
self._current_stage = 0 |
|
|
self._current_cycle_messages = [] |
|
|
self._advisor_history = [] |
|
|
self._cycle_summaries = [] |
|
|
self._metrics_history = [] |
|
|
self._threat_history = [] |
|
|
self._action_history = [] |
|
|
self._player_actions = [] |
|
|
self._advisor_running = False |
|
|
self._auto_execute = True |
|
|
self._executed_recommendations = set() |
|
|
self._tick_count = 0 |
|
|
self._advisor_first_run = True |
|
|
self._session_owner_id = "" |
|
|
|
|
|
self._last_grid_hash = "" |
|
|
self._last_advisor_signature = () |
|
|
self._last_history_signature = () |
|
|
self._last_event_log = "" |
|
|
self._last_button_states = (True, False) |
|
|
self._last_result_state = "" |
|
|
self._mcp_client = LocalFastMCPClient(fastmcp_server, self._record_mcp_call) |
|
|
self._mcp_call_log = [] |
|
|
self._mcp_log_dirty = False |
|
|
self._last_mcp_log = "" |
|
|
|
|
|
def start( |
|
|
self, |
|
|
seed: Optional[int] = None, |
|
|
fire_count: int = 4, |
|
|
fire_intensity: float = 0.6, |
|
|
building_count: int = 16, |
|
|
max_units: int = 10, |
|
|
session_id: Optional[str] = None, |
|
|
on_update: Optional[Callable] = None |
|
|
) -> dict: |
|
|
f""" |
|
|
Start a new simulation. |
|
|
|
|
|
Args: |
|
|
seed: Random seed for reproducibility |
|
|
fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}) |
|
|
fire_intensity: Initial fire intensity (0.0-1.0) |
|
|
building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}) |
|
|
on_update: Callback function called on state changes |
|
|
|
|
|
Returns: |
|
|
Initial world state |
|
|
""" |
|
|
|
|
|
thread = None |
|
|
with self._lock: |
|
|
if self._running: |
|
|
self._running = False |
|
|
thread = self._thread |
|
|
self._thread = None |
|
|
|
|
|
|
|
|
if thread and thread.is_alive(): |
|
|
thread.join(timeout=2.0) |
|
|
|
|
|
|
|
|
with self._lock: |
|
|
|
|
|
self._logs = [] |
|
|
self._latest_recommendations = None |
|
|
self._advisor_history = [] |
|
|
self._cycle_summaries = [] |
|
|
self._current_cycle_messages = [] |
|
|
self._current_stage = 0 |
|
|
self._is_thinking = False |
|
|
self._result_shown = False |
|
|
self._result_dismissed = False |
|
|
self._reset_after_action_report_locked() |
|
|
self._executed_recommendations = set() |
|
|
self._tick_count = 0 |
|
|
self._advisor_first_run = True |
|
|
self._on_update = on_update |
|
|
self._metrics_history = [] |
|
|
self._threat_history = [] |
|
|
self._action_history = [] |
|
|
self._player_actions = [] |
|
|
self._session_owner_id = session_id or "" |
|
|
|
|
|
self._last_grid_hash = "" |
|
|
self._last_advisor_signature = () |
|
|
self._last_history_signature = () |
|
|
self._last_event_log = "" |
|
|
self._last_button_states = (True, False) |
|
|
self._last_result_state = "" |
|
|
|
|
|
|
|
|
self.engine.reset( |
|
|
seed=seed, |
|
|
fire_count=fire_count, |
|
|
fire_intensity=fire_intensity, |
|
|
building_count=building_count, |
|
|
max_units=max_units |
|
|
) |
|
|
self._record_tick_metrics_locked(self.engine.get_state()) |
|
|
|
|
|
|
|
|
self._add_log("status", f"Simulation started: {fire_count} fires, {building_count} buildings, max {max_units} units") |
|
|
|
|
|
|
|
|
self._running = True |
|
|
self._thread = threading.Thread(target=self._simulation_loop, daemon=True) |
|
|
self._thread.start() |
|
|
|
|
|
return self._compose_state_locked() |
|
|
|
|
|
def resume(self, on_update: Optional[Callable] = None) -> dict: |
|
|
""" |
|
|
Resume a paused simulation. |
|
|
|
|
|
Returns: |
|
|
Current world state, or error if no paused simulation exists |
|
|
""" |
|
|
with self._lock: |
|
|
|
|
|
if self.engine.world is None: |
|
|
return {"status": "error", "message": "No simulation to resume"} |
|
|
|
|
|
|
|
|
if self._running: |
|
|
return {"status": "error", "message": "Simulation is already running"} |
|
|
|
|
|
|
|
|
current_status = self.engine.world.status |
|
|
if current_status in [SimulationStatus.SUCCESS, SimulationStatus.FAIL]: |
|
|
return {"status": "error", "message": f"Simulation has ended ({current_status.value})"} |
|
|
|
|
|
self._on_update = on_update |
|
|
|
|
|
|
|
|
self.engine.world.status = SimulationStatus.RUNNING |
|
|
|
|
|
|
|
|
self._add_log("status", "Simulation resumed") |
|
|
|
|
|
|
|
|
self._running = True |
|
|
self._thread = threading.Thread(target=self._simulation_loop, daemon=True) |
|
|
self._thread.start() |
|
|
|
|
|
return self._compose_state_locked() |
|
|
|
|
|
def pause(self) -> dict: |
|
|
"""Pause the simulation (can be resumed later).""" |
|
|
|
|
|
with self._lock: |
|
|
self._running = False |
|
|
thread = self._thread |
|
|
self._thread = None |
|
|
|
|
|
|
|
|
if thread and thread.is_alive(): |
|
|
thread.join(timeout=2.0) |
|
|
|
|
|
with self._lock: |
|
|
self._add_log("status", "Simulation paused") |
|
|
|
|
|
|
|
|
if self.engine.world: |
|
|
return self._compose_state_locked() |
|
|
return {"status": "idle", "after_action_report": self._get_after_action_report_payload_locked()} |
|
|
|
|
|
def is_paused(self) -> bool: |
|
|
"""Check if simulation is paused (has world but not running).""" |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
return False |
|
|
|
|
|
return ( |
|
|
not self._running |
|
|
and self.engine.world.status == SimulationStatus.RUNNING |
|
|
) |
|
|
|
|
|
def can_resume_session(self, session_id: Optional[str]) -> bool: |
|
|
"""Check if the provided session owns the paused simulation.""" |
|
|
if not session_id: |
|
|
return False |
|
|
with self._lock: |
|
|
if ( |
|
|
not self.engine.world |
|
|
or self._session_owner_id != session_id |
|
|
): |
|
|
return False |
|
|
return ( |
|
|
not self._running |
|
|
and self.engine.world.status == SimulationStatus.RUNNING |
|
|
) |
|
|
|
|
|
def _stop_internal(self): |
|
|
"""Internal stop - sets flag only (must be called with lock held).""" |
|
|
self._running = False |
|
|
|
|
|
def reset( |
|
|
self, |
|
|
seed: Optional[int] = None, |
|
|
fire_count: int = 4, |
|
|
fire_intensity: float = 0.6, |
|
|
building_count: int = 16, |
|
|
max_units: int = 10, |
|
|
session_id: Optional[str] = None, |
|
|
) -> dict: |
|
|
"""Reset simulation without starting the loop.""" |
|
|
|
|
|
thread = None |
|
|
with self._lock: |
|
|
if self._running: |
|
|
self._running = False |
|
|
thread = self._thread |
|
|
self._thread = None |
|
|
|
|
|
|
|
|
if thread and thread.is_alive(): |
|
|
thread.join(timeout=2.0) |
|
|
|
|
|
|
|
|
with self._lock: |
|
|
self._logs = [] |
|
|
self._latest_recommendations = None |
|
|
self._advisor_history = [] |
|
|
self._cycle_summaries = [] |
|
|
self._current_cycle_messages = [] |
|
|
self._current_stage = 0 |
|
|
self._is_thinking = False |
|
|
self._result_shown = False |
|
|
self._result_dismissed = False |
|
|
self._reset_after_action_report_locked() |
|
|
self._executed_recommendations = set() |
|
|
self._tick_count = 0 |
|
|
self._advisor_first_run = True |
|
|
self._metrics_history = [] |
|
|
self._threat_history = [] |
|
|
self._action_history = [] |
|
|
self._session_owner_id = session_id or "" |
|
|
|
|
|
self._last_grid_hash = "" |
|
|
self._last_advisor_signature = () |
|
|
self._last_history_signature = () |
|
|
self._last_event_log = "" |
|
|
self._last_button_states = (True, False) |
|
|
self._last_result_state = "" |
|
|
|
|
|
self.engine.reset( |
|
|
seed=seed, |
|
|
fire_count=fire_count, |
|
|
fire_intensity=fire_intensity, |
|
|
building_count=building_count, |
|
|
max_units=max_units |
|
|
) |
|
|
self._record_tick_metrics_locked(self.engine.get_state()) |
|
|
self._add_log("status", f"Simulation reset: {fire_count} fires, {building_count} buildings, max {max_units} units") |
|
|
|
|
|
return self._compose_state_locked() |
|
|
|
|
|
def get_state(self) -> dict: |
|
|
"""Get current world state (thread-safe).""" |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
return { |
|
|
"status": "idle", |
|
|
"message": "No simulation running", |
|
|
"after_action_report": self._get_after_action_report_payload_locked(), |
|
|
} |
|
|
return self._compose_state_locked() |
|
|
|
|
|
def _compose_state_locked(self) -> dict: |
|
|
"""Attach after-action report payload to the current engine state.""" |
|
|
state = self.engine.get_state() |
|
|
state["after_action_report"] = self._get_after_action_report_payload_locked() |
|
|
return state |
|
|
|
|
|
def _reset_after_action_report_locked(self): |
|
|
"""Clear cached after-action report data.""" |
|
|
self._result_report = None |
|
|
self._result_report_status = "idle" |
|
|
self._result_report_error = "" |
|
|
self._last_result_signature = "" |
|
|
self._last_result_payload_signature = "" |
|
|
|
|
|
def _record_cycle_summary(self, tick: int, cycle_summary: CycleSummary, state_snapshot: Optional[dict] = None): |
|
|
"""Store Stage 4 summary for each advisor cycle, including metrics for charts.""" |
|
|
state_snapshot = state_snapshot or {} |
|
|
metrics = { |
|
|
"tick": tick, |
|
|
"fires": len(state_snapshot.get("fires", [])), |
|
|
"units": len(state_snapshot.get("units", [])), |
|
|
"max_units": state_snapshot.get("max_units", 0), |
|
|
"building_integrity": state_snapshot.get("building_integrity", 1.0), |
|
|
} |
|
|
entry = { |
|
|
"tick": tick, |
|
|
"headline": cycle_summary.headline, |
|
|
"threat_level": cycle_summary.threat_level, |
|
|
"key_highlights": cycle_summary.key_highlights, |
|
|
"risks": cycle_summary.risks, |
|
|
"next_focus": cycle_summary.next_focus, |
|
|
"metrics": metrics, |
|
|
} |
|
|
self._cycle_summaries.append(entry) |
|
|
if len(self._cycle_summaries) > 30: |
|
|
self._cycle_summaries = self._cycle_summaries[-30:] |
|
|
threat_value_map = {"CRITICAL": 4, "HIGH": 3, "MODERATE": 2, "LOW": 1} |
|
|
value = threat_value_map.get(cycle_summary.threat_level.upper(), 0) if cycle_summary.threat_level else 0 |
|
|
self._threat_history.append({ |
|
|
"tick": tick, |
|
|
"threat_level": cycle_summary.threat_level, |
|
|
"value": value, |
|
|
}) |
|
|
if len(self._threat_history) > 120: |
|
|
self._threat_history = self._threat_history[-120:] |
|
|
|
|
|
def _record_tick_metrics_locked(self, state: Optional[dict]): |
|
|
"""Capture per-tick metrics for after-action chart visualization.""" |
|
|
if not state: |
|
|
return |
|
|
entry = { |
|
|
"tick": state.get("tick", 0), |
|
|
"fires": len(state.get("fires", [])), |
|
|
"units": len(state.get("units", [])), |
|
|
"max_units": state.get("max_units") or getattr(self.engine.world, "max_units", 0) if self.engine.world else 0, |
|
|
"building_integrity": state.get("building_integrity", 1.0), |
|
|
} |
|
|
self._metrics_history.append(entry) |
|
|
if len(self._metrics_history) > 600: |
|
|
self._metrics_history = self._metrics_history[-600:] |
|
|
|
|
|
def _record_action_breakdown(self, tick: int, deploy: int, move: int, replace: int): |
|
|
"""Track how many actions the AI recommended per tick.""" |
|
|
self._action_history.append({ |
|
|
"tick": tick, |
|
|
"deploy": deploy, |
|
|
"move": move, |
|
|
"replace": replace, |
|
|
}) |
|
|
if len(self._action_history) > 200: |
|
|
self._action_history = self._action_history[-200:] |
|
|
|
|
|
def _record_player_action(self, action: str, description: str, metadata: Optional[dict] = None): |
|
|
"""Track player-driven interventions (manual deploy/remove/fire).""" |
|
|
if metadata is None: |
|
|
metadata = {} |
|
|
tick = 0 |
|
|
if self.engine and self.engine.world: |
|
|
tick = getattr(self.engine.world, "tick", 0) |
|
|
else: |
|
|
tick = self._tick_count |
|
|
entry = { |
|
|
"tick": tick, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"action": action, |
|
|
"description": description, |
|
|
"details": dict(metadata), |
|
|
} |
|
|
self._player_actions.append(entry) |
|
|
if len(self._player_actions) > 200: |
|
|
self._player_actions = self._player_actions[-200:] |
|
|
|
|
|
def _build_player_action_context(self) -> dict: |
|
|
"""Summarize player-driven interventions for after-action reporting.""" |
|
|
actions = list(self._player_actions) |
|
|
counts = {"deploy_unit": 0, "remove_unit": 0, "add_fire": 0} |
|
|
action_meta = { |
|
|
"deploy_unit": ("π", "Deployed units"), |
|
|
"remove_unit": ("β»οΈ", "Removed units"), |
|
|
"add_fire": ("π₯", "Ignited fires"), |
|
|
} |
|
|
for entry in actions: |
|
|
action_type = entry.get("action") |
|
|
if action_type in counts: |
|
|
counts[action_type] += 1 |
|
|
total = sum(counts.values()) |
|
|
if total: |
|
|
parts = [f"Player executed {total} manual action(s)."] |
|
|
if counts["deploy_unit"]: |
|
|
parts.append(f"π Deploy: {counts['deploy_unit']} time(s)") |
|
|
if counts["remove_unit"]: |
|
|
parts.append(f"β»οΈ Remove: {counts['remove_unit']} time(s)") |
|
|
if counts["add_fire"]: |
|
|
parts.append(f"π₯ Ignite: {counts['add_fire']} time(s)") |
|
|
summary = " ".join(parts) |
|
|
else: |
|
|
summary = "Player has not manually deployed, removed, or ignited anything this run." |
|
|
recent_entries = list(reversed(actions[-6:])) |
|
|
recent = [ |
|
|
{ |
|
|
"tick": entry.get("tick", 0), |
|
|
"description": entry.get("description", ""), |
|
|
"action": entry.get("action"), |
|
|
"timestamp": entry.get("timestamp"), |
|
|
} |
|
|
for entry in recent_entries |
|
|
] |
|
|
markdown_lines = [summary] |
|
|
if recent: |
|
|
markdown_lines.append("") |
|
|
markdown_lines.append("**Recent player actions**") |
|
|
for entry in recent: |
|
|
icon = action_meta.get(entry["action"], ("π", ""))[0] |
|
|
markdown_lines.append(f"- Tick {entry['tick']} Β· {icon} {entry['description']}") |
|
|
markdown = "\n".join(markdown_lines).strip() |
|
|
return { |
|
|
"total": total, |
|
|
"counts": counts, |
|
|
"recent": recent, |
|
|
"summary": summary, |
|
|
"markdown": markdown, |
|
|
} |
|
|
|
|
|
def _record_mcp_call(self, entry: dict[str, Any]) -> None: |
|
|
"""Capture MCP tool invocations for UI display.""" |
|
|
with self._lock: |
|
|
cloned = dict(entry) |
|
|
cloned.setdefault("local_timestamp", datetime.now().strftime("%H:%M:%S")) |
|
|
self._mcp_call_log.append(cloned) |
|
|
if len(self._mcp_call_log) > 80: |
|
|
self._mcp_call_log = self._mcp_call_log[-80:] |
|
|
self._mcp_log_dirty = True |
|
|
|
|
|
def _get_mcp_log_text_locked(self, limit: int = 20) -> str: |
|
|
"""Format MCP tool logs for UI display.""" |
|
|
if not self._mcp_call_log: |
|
|
return "No MCP tool calls yet..." |
|
|
lines = [] |
|
|
for entry in self._mcp_call_log[-limit:]: |
|
|
ts = entry.get("local_timestamp", entry.get("timestamp", "")) |
|
|
tool = entry.get("tool", "unknown") |
|
|
args = entry.get("arguments", {}) |
|
|
args_preview = ", ".join(f"{k}={v}" for k, v in list(args.items())[:3]) |
|
|
result = entry.get("result", {}) |
|
|
status = result.get("status", "ok") if isinstance(result, dict) else "ok" |
|
|
duration = entry.get("duration_ms", 0) |
|
|
lines.append(f"[{ts}] {tool}({args_preview}) β {status} ({duration} ms)") |
|
|
return "\n".join(lines) |
|
|
|
|
|
def _get_after_action_report_payload_locked(self) -> dict: |
|
|
"""Serialize after-action report state for UI consumption.""" |
|
|
payload: dict = {"status": self._result_report_status} |
|
|
if self._result_report_status == "ready" and self._result_report: |
|
|
payload["report"] = self._result_report.to_dict() |
|
|
elif self._result_report_status == "error": |
|
|
payload["error"] = self._result_report_error or "Unknown error" |
|
|
return payload |
|
|
|
|
|
def _call_mcp_tool(self, name: str, **kwargs: Any) -> dict[str, Any]: |
|
|
"""Invoke a tool through the shared FastMCP client.""" |
|
|
session_id = self._session_owner_id or "default" |
|
|
if not self._mcp_client: |
|
|
self._mcp_client = LocalFastMCPClient(fastmcp_server, self._record_mcp_call) |
|
|
if self._attached_session_id and self._attached_session_id != session_id: |
|
|
detach_engine(self._attached_session_id) |
|
|
self._attached_session_id = "" |
|
|
if self._attached_session_id != session_id: |
|
|
attach_engine(self.engine, session_id) |
|
|
self._attached_session_id = session_id |
|
|
try: |
|
|
call_kwargs = dict(kwargs) |
|
|
call_kwargs["session_id"] = session_id |
|
|
return self._mcp_client.call_tool(name, **call_kwargs) |
|
|
except Exception as exc: |
|
|
self._add_log("error", f"MCP tool {name} failed: {exc}") |
|
|
return {"status": "error", "message": str(exc)} |
|
|
|
|
|
def shutdown(self) -> None: |
|
|
"""Stop background threads and detach from MCP server.""" |
|
|
thread = None |
|
|
with self._lock: |
|
|
if self._running: |
|
|
self._running = False |
|
|
thread = self._thread |
|
|
self._thread = None |
|
|
if thread and thread.is_alive(): |
|
|
thread.join(timeout=2.0) |
|
|
with self._lock: |
|
|
if self._attached_session_id: |
|
|
detach_engine(self._attached_session_id) |
|
|
self._attached_session_id = "" |
|
|
self._session_owner_id = "" |
|
|
self._on_update = None |
|
|
|
|
|
def _get_mcp_world_state(self) -> dict: |
|
|
"""Fetch world state via the MCP tools with local fallback.""" |
|
|
state = self._call_mcp_tool("get_world_state") |
|
|
if not isinstance(state, dict) or state.get("status") == "error": |
|
|
state = self.engine.get_state() |
|
|
|
|
|
def _maybe(name: str) -> dict[str, Any]: |
|
|
data = self._call_mcp_tool(name) |
|
|
return data if isinstance(data, dict) else {} |
|
|
|
|
|
state["mcp_idle_units"] = _maybe("find_idle_units") |
|
|
state["mcp_uncovered_fires"] = _maybe("find_uncovered_fires") |
|
|
state["mcp_building_threats"] = _maybe("find_building_threats") |
|
|
state["mcp_coverage"] = _maybe("analyze_coverage") |
|
|
return state |
|
|
|
|
|
def should_show_result(self) -> tuple[bool, bool]: |
|
|
""" |
|
|
Check if game result should be shown. |
|
|
|
|
|
Returns: |
|
|
tuple of (should_show, is_first_time) |
|
|
- should_show: True if popup should be displayed |
|
|
- is_first_time: True if this is the first time showing (needs render) |
|
|
""" |
|
|
with self._lock: |
|
|
|
|
|
if self._result_dismissed: |
|
|
return (False, False) |
|
|
|
|
|
state = self.engine.get_state() if self.engine.world else {"status": "idle"} |
|
|
status = state.get("status", "idle") |
|
|
|
|
|
if status in ["success", "fail"]: |
|
|
|
|
|
is_first_time = not self._result_shown |
|
|
if is_first_time: |
|
|
self._result_shown = True |
|
|
return (True, is_first_time) |
|
|
|
|
|
return (False, False) |
|
|
|
|
|
def dismiss_result(self): |
|
|
"""Dismiss the game result popup (called when player clicks it).""" |
|
|
with self._lock: |
|
|
self._result_dismissed = True |
|
|
|
|
|
def get_result_status(self) -> str: |
|
|
"""Get current result status.""" |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
return "idle" |
|
|
state = self.engine.get_state() |
|
|
return state.get("status", "idle") |
|
|
|
|
|
def _prepare_after_action_context_locked(self, outcome: str, state: dict) -> Optional[dict]: |
|
|
"""Build context for after-action report generation.""" |
|
|
signature = f"{outcome}_{state.get('tick', 0)}_{len(self._logs)}" |
|
|
if signature == self._last_result_signature: |
|
|
return None |
|
|
context = self._build_after_action_context_locked(outcome, state) |
|
|
self._result_report_status = "pending" |
|
|
self._result_report = None |
|
|
self._result_report_error = "" |
|
|
self._last_result_signature = signature |
|
|
self._last_result_payload_signature = "" |
|
|
return context |
|
|
|
|
|
def _build_after_action_context_locked(self, outcome: str, state: dict) -> dict: |
|
|
"""Assemble transcripts and metrics for the after-action LLM call.""" |
|
|
outcome_label = "Victory" if outcome == "success" else "Defeat" |
|
|
fires_remaining = len(state.get("fires", [])) |
|
|
units_active = len(state.get("units", [])) |
|
|
building_integrity = state.get("building_integrity", 1.0) |
|
|
integrity_percent = f"{building_integrity:.0%}" if isinstance(building_integrity, (int, float)) else "N/A" |
|
|
|
|
|
stage_messages = self._current_cycle_messages[:] if self._current_cycle_messages else self._advisor_history[-3:] |
|
|
transcripts = {"assessment_md": "", "planning_md": "", "execution_md": ""} |
|
|
for msg in stage_messages: |
|
|
content = msg.get("content", "") |
|
|
if not content: |
|
|
continue |
|
|
lowered = content.lower() |
|
|
if "stage 1" in lowered and not transcripts["assessment_md"]: |
|
|
transcripts["assessment_md"] = content |
|
|
elif "stage 2" in lowered and not transcripts["planning_md"]: |
|
|
transcripts["planning_md"] = content |
|
|
elif "stage 3" in lowered and not transcripts["execution_md"]: |
|
|
transcripts["execution_md"] = content |
|
|
|
|
|
summary_text = ( |
|
|
f"Tick {state.get('tick', 0)} Β· Fires {fires_remaining} Β· " |
|
|
f"Units {units_active}/{state.get('max_units', 0)} Β· Building Integrity {integrity_percent}" |
|
|
) |
|
|
|
|
|
if self._metrics_history: |
|
|
chart_points = [dict(point) for point in self._metrics_history] |
|
|
else: |
|
|
chart_points = [] |
|
|
for entry in self._cycle_summaries: |
|
|
metrics = entry.get("metrics") or {} |
|
|
if not metrics: |
|
|
continue |
|
|
chart_points.append({ |
|
|
"tick": metrics.get("tick", entry.get("tick")), |
|
|
"fires": metrics.get("fires", 0), |
|
|
"units": metrics.get("units", 0), |
|
|
"max_units": metrics.get("max_units", state.get("max_units", 0)), |
|
|
"building_integrity": metrics.get("building_integrity", building_integrity), |
|
|
}) |
|
|
|
|
|
context = { |
|
|
"outcome": outcome, |
|
|
"outcome_label": outcome_label, |
|
|
"tick": state.get("tick", 0), |
|
|
"fires_remaining": fires_remaining, |
|
|
"units_active": units_active, |
|
|
"building_integrity_percent": integrity_percent, |
|
|
"summary_text": summary_text, |
|
|
"state_snapshot": { |
|
|
"tick": state.get("tick", 0), |
|
|
"status": state.get("status", ""), |
|
|
"building_integrity": building_integrity, |
|
|
"max_units": state.get("max_units", 0), |
|
|
}, |
|
|
"cycle_summaries": list(self._cycle_summaries), |
|
|
"chart_points": chart_points, |
|
|
"threat_history": list(self._threat_history), |
|
|
"action_history": list(self._action_history), |
|
|
} |
|
|
player_actions_context = self._build_player_action_context() |
|
|
context.update(transcripts) |
|
|
context["player_actions_context"] = player_actions_context |
|
|
context["player_actions_md"] = player_actions_context.get("markdown", "") |
|
|
return context |
|
|
|
|
|
def _launch_after_action_report(self, context: dict): |
|
|
"""Run after-action report generation in the background.""" |
|
|
|
|
|
def _runner(): |
|
|
try: |
|
|
report = self.advisor.generate_after_action_report(context) |
|
|
except Exception as exc: |
|
|
with self._lock: |
|
|
self._result_report = None |
|
|
self._result_report_status = "error" |
|
|
self._result_report_error = str(exc) |
|
|
self._last_result_payload_signature = "" |
|
|
return |
|
|
|
|
|
with self._lock: |
|
|
if report and not report.error: |
|
|
self._result_report = report |
|
|
self._result_report_status = "ready" |
|
|
self._result_report_error = "" |
|
|
else: |
|
|
self._result_report = report |
|
|
self._result_report_status = "error" |
|
|
self._result_report_error = (report.error if report else "Unknown error") |
|
|
self._last_result_payload_signature = "" |
|
|
|
|
|
threading.Thread(target=_runner, daemon=True).start() |
|
|
|
|
|
def get_recommendations(self) -> Optional[dict]: |
|
|
"""Get latest advisor recommendations.""" |
|
|
with self._lock: |
|
|
if self._latest_recommendations: |
|
|
return self._latest_recommendations.to_dict() |
|
|
return None |
|
|
|
|
|
def get_logs(self, limit: int = 50) -> list[dict]: |
|
|
"""Get recent log entries.""" |
|
|
with self._lock: |
|
|
return [log.to_dict() for log in self._logs[-limit:]] |
|
|
|
|
|
def get_logs_text(self, limit: int = 20) -> str: |
|
|
"""Get all logs as formatted text for display.""" |
|
|
with self._lock: |
|
|
lines = [] |
|
|
for log in self._logs[-limit:]: |
|
|
if log.event_type == "advisor": |
|
|
lines.append(f"[Tick {log.tick}] π€ AI: {log.message}") |
|
|
if log.details and log.details.get("recommendations"): |
|
|
for rec in log.details["recommendations"]: |
|
|
target = rec.get("target", {}) |
|
|
lines.append( |
|
|
f" β {rec.get('suggested_unit_type')} at " |
|
|
f"({target.get('x')}, {target.get('y')}): {rec.get('reason', '')[:50]}" |
|
|
) |
|
|
elif log.event_type == "deploy": |
|
|
lines.append(f"[Tick {log.tick}] π Deploy: {log.message}") |
|
|
elif log.event_type == "status": |
|
|
lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
|
|
elif log.event_type == "error": |
|
|
lines.append(f"[{log.timestamp}] β οΈ Error: {log.message}") |
|
|
else: |
|
|
lines.append(f"[Tick {log.tick}] {log.message}") |
|
|
return "\n".join(lines) |
|
|
|
|
|
def get_mcp_log_text(self, limit: int = 20) -> str: |
|
|
"""Expose MCP call history.""" |
|
|
with self._lock: |
|
|
return self._get_mcp_log_text_locked(limit) |
|
|
|
|
|
def get_advisor_text(self, limit: int = 5) -> str: |
|
|
"""Get AI advisor display with rich reasoning (legacy text format).""" |
|
|
messages = self.get_advisor_messages(limit) |
|
|
if not messages: |
|
|
return "π€ AI Advisor standing by..." |
|
|
return messages[-1].get("content", "") if messages else "" |
|
|
|
|
|
def _wrap_analysis_block( |
|
|
self, |
|
|
content: str, |
|
|
default_summary: str = "AI Analysis", |
|
|
*, |
|
|
split_stage_sections: bool = False, |
|
|
open_by_default: bool = False, |
|
|
) -> str: |
|
|
"""Wrap advisor markdown in a collapsible details block.""" |
|
|
if not content: |
|
|
return content |
|
|
|
|
|
lines = content.splitlines() |
|
|
summary = default_summary |
|
|
first_value_line = "" |
|
|
auto_summary = default_summary == "AI Analysis" |
|
|
|
|
|
for line in lines: |
|
|
stripped = line.strip() |
|
|
if not stripped: |
|
|
continue |
|
|
if not first_value_line: |
|
|
first_value_line = stripped |
|
|
if auto_summary and stripped.lower().startswith("###"): |
|
|
summary = stripped.lstrip("# ").strip() |
|
|
break |
|
|
|
|
|
summary = html.escape(summary or default_summary) |
|
|
|
|
|
if auto_summary and first_value_line and first_value_line.lower().startswith("###"): |
|
|
body_lines = lines[1:] |
|
|
else: |
|
|
body_lines = lines |
|
|
body = "\n".join(body_lines).strip() |
|
|
|
|
|
if split_stage_sections: |
|
|
rendered = self._render_stage_sections(body, open_by_default) |
|
|
if rendered: |
|
|
body = rendered |
|
|
|
|
|
open_attr = " open" if open_by_default else "" |
|
|
return ( |
|
|
f"<details class=\"analysis-entry\"{open_attr}>" |
|
|
f"<summary>{summary}</summary>\n" |
|
|
f"{body}\n" |
|
|
"</details>" |
|
|
) |
|
|
|
|
|
def _render_stage_sections(self, body: str, force_open: bool = False) -> str | None: |
|
|
"""Split a multi-stage markdown block into per-stage collapsible sections.""" |
|
|
lines = body.splitlines() |
|
|
sections: list[tuple[str, list[str]]] = [] |
|
|
current_title: str | None = None |
|
|
current_lines: list[str] = [] |
|
|
|
|
|
def _flush(): |
|
|
nonlocal current_title, current_lines |
|
|
if current_title is None: |
|
|
return |
|
|
sections.append((current_title, list(current_lines))) |
|
|
current_title = None |
|
|
current_lines = [] |
|
|
|
|
|
for line in lines: |
|
|
stripped = line.strip() |
|
|
if stripped.startswith("### "): |
|
|
_flush() |
|
|
current_title = stripped.lstrip("# ").strip() |
|
|
current_lines = [] |
|
|
continue |
|
|
if current_title is None: |
|
|
|
|
|
if not stripped: |
|
|
continue |
|
|
current_title = "Details" |
|
|
current_lines.append(line) |
|
|
|
|
|
_flush() |
|
|
|
|
|
if not sections: |
|
|
return None |
|
|
|
|
|
rendered_sections: list[str] = [] |
|
|
for title, section_lines in sections: |
|
|
section_body = "\n".join(section_lines).rstrip() |
|
|
if section_body.endswith("---"): |
|
|
section_body = section_body[:-3].rstrip() |
|
|
section_body = section_body.strip() or "_No details available._" |
|
|
open_attr = " open" if force_open else "" |
|
|
rendered_sections.append( |
|
|
f"<details class=\"analysis-stage\"{open_attr}>" |
|
|
f"<summary>{html.escape(title)}</summary>\n" |
|
|
f"{section_body}\n" |
|
|
"</details>" |
|
|
) |
|
|
|
|
|
return "\n\n".join(rendered_sections) |
|
|
|
|
|
def _build_advisor_messages_locked(self) -> list[dict]: |
|
|
"""Assemble advisor messages. Caller must hold _lock.""" |
|
|
messages: list[dict] = [] |
|
|
|
|
|
|
|
|
if ( |
|
|
not self._current_cycle_messages |
|
|
and not self._advisor_history |
|
|
and not self._is_thinking |
|
|
): |
|
|
messages.append({ |
|
|
"role": "assistant", |
|
|
"content": "π Hello! I'm your AI Tactical Advisor.\n\nStart the simulation and I'll analyze the fire situation, describe my reasoning, and recommend tactical deployments.\n\nWatch me think, plan, and execute!" |
|
|
}) |
|
|
return messages |
|
|
|
|
|
current_entry = self._build_current_cycle_entry_locked() |
|
|
if current_entry: |
|
|
messages.append(current_entry) |
|
|
|
|
|
return messages |
|
|
|
|
|
def _build_current_cycle_entry_locked(self) -> Optional[dict]: |
|
|
"""Render the active cycle as a single β±οΈ Tick block (even while streaming).""" |
|
|
cycle_sections: list[str] = [] |
|
|
stage_placeholders = { |
|
|
1: ("π Stage 1 Β· Assessment", "Querying MCP tools and analyzing the situation..."), |
|
|
2: ("π― Stage 2 Β· Planning", "Formulating tactical strategy..."), |
|
|
3: ("β‘ Stage 3 Β· Execution", "Generating deployment commands via MCP..."), |
|
|
4: ("π§ Stage 4 Β· Summary", "Consolidating cycle insights..."), |
|
|
} |
|
|
current_stage_title: Optional[str] = None |
|
|
for msg in self._current_cycle_messages: |
|
|
content = msg.get("content", "") |
|
|
if content: |
|
|
cycle_sections.append(content) |
|
|
|
|
|
|
|
|
if self._is_thinking: |
|
|
tick = self._thinking_start_tick |
|
|
if tick is None and self.engine.world: |
|
|
tick = self.engine.get_state().get("tick", 0) |
|
|
title, desc = stage_placeholders.get( |
|
|
self._current_stage, |
|
|
("π€ AI Thinking", "Processing...") |
|
|
) |
|
|
current_stage_title = title |
|
|
cycle_sections.append( |
|
|
f"### {title} `[Tick {tick if tick is not None else '?'}]`\n\n{desc}" |
|
|
) |
|
|
|
|
|
if not cycle_sections: |
|
|
return None |
|
|
|
|
|
tick_label = self._thinking_start_tick |
|
|
if tick_label is None and self.engine.world: |
|
|
tick_label = self.engine.get_state().get("tick", 0) |
|
|
summary = f"β±οΈ Tick {tick_label if tick_label is not None else '?'}" |
|
|
if self._is_thinking and current_stage_title: |
|
|
summary = f"{summary} Β· {current_stage_title} β³" |
|
|
body = "\n\n".join(cycle_sections).strip() |
|
|
open_by_default = tick_label == 0 |
|
|
|
|
|
return { |
|
|
"role": "assistant", |
|
|
"content": self._wrap_analysis_block( |
|
|
body, |
|
|
summary, |
|
|
split_stage_sections=True, |
|
|
open_by_default=open_by_default, |
|
|
), |
|
|
"metadata": {"title": summary, "status": "pending" if self._is_thinking else "done"}, |
|
|
} |
|
|
|
|
|
def get_advisor_messages(self) -> list: |
|
|
"""Get current AI advisor cycle messages (progressive stage display).""" |
|
|
with self._lock: |
|
|
return self._build_advisor_messages_locked() |
|
|
|
|
|
def _build_history_messages_locked(self) -> list[dict]: |
|
|
"""Aggregate advisor history into per-tick chatbot messages.""" |
|
|
if not self._advisor_history: |
|
|
return [] |
|
|
|
|
|
history_messages: list[dict] = [] |
|
|
buffer: list[str] = [] |
|
|
tick_num = "?" |
|
|
|
|
|
def flush_cycle(): |
|
|
nonlocal buffer, tick_num |
|
|
cycle_text = "\n\n".join(buffer).strip() |
|
|
if cycle_text: |
|
|
history_messages.append({ |
|
|
"role": "assistant", |
|
|
"content": self._wrap_analysis_block( |
|
|
cycle_text, |
|
|
f"β±οΈ Tick {tick_num}", |
|
|
split_stage_sections=True, |
|
|
open_by_default=(str(tick_num).strip() == "0"), |
|
|
), |
|
|
"metadata": {"title": f"β±οΈ Tick {tick_num}", "status": "done"}, |
|
|
}) |
|
|
buffer = [] |
|
|
tick_num = "?" |
|
|
|
|
|
for msg in self._advisor_history: |
|
|
content = msg.get("content", "") |
|
|
if not content: |
|
|
continue |
|
|
buffer.append(content) |
|
|
if tick_num == "?" and "[Tick " in content: |
|
|
start = content.find("[Tick ") + 6 |
|
|
end = content.find("]", start) |
|
|
if end > start: |
|
|
tick_num = content[start:end] |
|
|
|
|
|
lowered = content.lower() |
|
|
if "stage 4" in lowered: |
|
|
flush_cycle() |
|
|
|
|
|
if buffer: |
|
|
flush_cycle() |
|
|
|
|
|
return history_messages if history_messages else [{ |
|
|
"role": "assistant", |
|
|
"content": self._wrap_analysis_block("No previous analysis cycles yet...", "π History"), |
|
|
"metadata": {"title": "π History", "status": "done"}, |
|
|
}] |
|
|
|
|
|
def get_advisor_markdown(self) -> str: |
|
|
"""Get the latest AI advisor cycle as formatted plain text.""" |
|
|
with self._lock: |
|
|
return self._get_advisor_markdown_internal() |
|
|
|
|
|
def get_advisor_history_chat_messages(self) -> list[dict]: |
|
|
"""Get advisor history formatted for chatbot display.""" |
|
|
with self._lock: |
|
|
return self._build_history_messages_locked() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_game_changes(self) -> dict: |
|
|
""" |
|
|
Check game-critical components for changes (called by game_timer every 1s). |
|
|
|
|
|
Returns a dict with: |
|
|
- state: current simulation state |
|
|
- grid_changed: bool - whether fire/unit positions changed |
|
|
- status_changed: bool - whether status bar should update |
|
|
""" |
|
|
import hashlib |
|
|
|
|
|
with self._lock: |
|
|
if self.engine.world: |
|
|
state = self._compose_state_locked() |
|
|
else: |
|
|
state = { |
|
|
"status": "idle", |
|
|
"after_action_report": self._get_after_action_report_payload_locked(), |
|
|
} |
|
|
result = {"state": state} |
|
|
|
|
|
|
|
|
grid_data = { |
|
|
"fires": sorted([(f["x"], f["y"], round(f["intensity"], 2)) for f in state.get("fires", [])]), |
|
|
"units": sorted([(u["x"], u["y"], u["type"]) for u in state.get("units", [])]), |
|
|
"buildings": sorted([(b["x"], b["y"]) for b in state.get("buildings", [])]) |
|
|
} |
|
|
current_grid_hash = hashlib.md5(json.dumps(grid_data, sort_keys=True).encode()).hexdigest() |
|
|
result["grid_changed"] = current_grid_hash != self._last_grid_hash |
|
|
if result["grid_changed"]: |
|
|
self._last_grid_hash = current_grid_hash |
|
|
|
|
|
|
|
|
status = state.get("status", "idle") |
|
|
is_running = self._running |
|
|
result["status_changed"] = is_running and status == "running" |
|
|
|
|
|
return result |
|
|
|
|
|
def get_ui_changes(self) -> dict: |
|
|
""" |
|
|
Check UI panel components for changes (called by ui_timer every 2s). |
|
|
|
|
|
Returns a dict with: |
|
|
- state: current simulation state |
|
|
- advisor_changed: bool - whether advisor messages changed |
|
|
- advisor_messages: list[dict] | None - new content if changed |
|
|
- history_changed: bool - whether history HTML changed |
|
|
- advisor_history: str or None - new content if changed |
|
|
- event_log_changed: bool - whether event log changed |
|
|
- event_log: str or None - new content if changed |
|
|
- buttons_changed: bool - whether button states changed |
|
|
- button_states: tuple (start_enabled, pause_enabled) |
|
|
- result_changed: bool - whether result popup changed |
|
|
- result_state: str - current result state |
|
|
""" |
|
|
with self._lock: |
|
|
if self.engine.world: |
|
|
state = self._compose_state_locked() |
|
|
else: |
|
|
state = { |
|
|
"status": "idle", |
|
|
"after_action_report": self._get_after_action_report_payload_locked(), |
|
|
} |
|
|
result = {"state": state} |
|
|
|
|
|
|
|
|
advisor_messages = self._get_advisor_chat_messages_internal() |
|
|
signature = tuple(msg.get("content", "") for msg in advisor_messages) |
|
|
content_changed = signature != self._last_advisor_signature |
|
|
result["advisor_changed"] = self._is_thinking or content_changed |
|
|
result["advisor_messages"] = advisor_messages if result["advisor_changed"] else None |
|
|
if content_changed: |
|
|
self._last_advisor_signature = signature |
|
|
|
|
|
|
|
|
history_messages = self._build_history_messages_locked() |
|
|
history_signature = tuple(msg.get("content", "") for msg in history_messages) |
|
|
result["history_changed"] = history_signature != self._last_history_signature |
|
|
result["advisor_history"] = history_messages if result["history_changed"] else None |
|
|
if result["history_changed"]: |
|
|
self._last_history_signature = history_signature |
|
|
|
|
|
|
|
|
event_log = self._get_event_log_internal() |
|
|
result["event_log_changed"] = event_log != self._last_event_log |
|
|
result["event_log"] = event_log if result["event_log_changed"] else None |
|
|
if result["event_log_changed"]: |
|
|
self._last_event_log = event_log |
|
|
|
|
|
|
|
|
if self._mcp_log_dirty: |
|
|
mcp_log = self._get_mcp_log_text_locked() |
|
|
result["mcp_log_changed"] = True |
|
|
result["mcp_log"] = mcp_log |
|
|
self._last_mcp_log = mcp_log |
|
|
self._mcp_log_dirty = False |
|
|
else: |
|
|
result["mcp_log_changed"] = False |
|
|
result["mcp_log"] = None |
|
|
|
|
|
|
|
|
status = state.get("status", "idle") |
|
|
is_running = self._running |
|
|
is_paused = ( |
|
|
self.engine.world is not None |
|
|
and not self._running |
|
|
and self.engine.world.status == SimulationStatus.RUNNING |
|
|
) |
|
|
start_enabled = is_paused or (not is_running and status in ["idle", "success", "fail"]) |
|
|
pause_enabled = is_running and status == "running" |
|
|
current_buttons = (start_enabled, pause_enabled) |
|
|
result["buttons_changed"] = current_buttons != self._last_button_states |
|
|
result["button_states"] = current_buttons |
|
|
if result["buttons_changed"]: |
|
|
self._last_button_states = current_buttons |
|
|
|
|
|
|
|
|
if self._result_dismissed: |
|
|
current_result = "" |
|
|
elif status in ["success", "fail"]: |
|
|
current_result = status |
|
|
else: |
|
|
current_result = "" |
|
|
result["result_changed"] = current_result != self._last_result_state |
|
|
result["result_state"] = current_result |
|
|
if result["result_changed"]: |
|
|
self._last_result_state = current_result |
|
|
|
|
|
report_payload = self._get_after_action_report_payload_locked() |
|
|
overlay_payload = { |
|
|
"outcome": current_result, |
|
|
"after_action": report_payload, |
|
|
} |
|
|
payload_signature = json.dumps(overlay_payload, sort_keys=True) |
|
|
if payload_signature != self._last_result_payload_signature: |
|
|
result["result_changed"] = True |
|
|
self._last_result_payload_signature = payload_signature |
|
|
result["result_payload"] = overlay_payload |
|
|
|
|
|
return result |
|
|
|
|
|
def get_changed_components(self) -> dict: |
|
|
""" |
|
|
Legacy function - combines game and UI changes. |
|
|
Used by button click handlers for full refresh. |
|
|
""" |
|
|
game = self.get_game_changes() |
|
|
ui = self.get_ui_changes() |
|
|
|
|
|
|
|
|
return { |
|
|
"state": game["state"], |
|
|
"grid_changed": game["grid_changed"], |
|
|
"status_changed": game["status_changed"], |
|
|
"advisor_changed": ui["advisor_changed"], |
|
|
"advisor_messages": ui["advisor_messages"], |
|
|
"history_changed": ui["history_changed"], |
|
|
"advisor_history": ui["advisor_history"], |
|
|
"event_log_changed": ui["event_log_changed"], |
|
|
"event_log": ui["event_log"], |
|
|
"mcp_log_changed": ui["mcp_log_changed"], |
|
|
"mcp_log": ui["mcp_log"], |
|
|
"buttons_changed": ui["buttons_changed"], |
|
|
"button_states": ui["button_states"], |
|
|
"result_changed": ui["result_changed"], |
|
|
"result_state": ui["result_state"], |
|
|
"result_payload": ui.get("result_payload"), |
|
|
} |
|
|
|
|
|
def _get_advisor_markdown_internal(self) -> str: |
|
|
"""Internal helper to flatten advisor messages into text.""" |
|
|
messages = self._build_advisor_messages_locked() |
|
|
parts = [msg.get("content", "") for msg in messages if msg.get("content")] |
|
|
return "\n\n---\n\n".join(parts) if parts else "Waiting for analysis..." |
|
|
|
|
|
def _get_advisor_chat_messages_internal(self) -> list[dict]: |
|
|
"""Internal helper to normalize advisor messages for Chatbot display.""" |
|
|
messages = self._build_advisor_messages_locked() |
|
|
chat_messages: list[dict] = [] |
|
|
for msg in messages: |
|
|
role = msg.get("role", "assistant") |
|
|
if role not in ("user", "assistant", "system"): |
|
|
role = "assistant" |
|
|
chat_msg = { |
|
|
"role": role, |
|
|
"content": msg.get("content", ""), |
|
|
} |
|
|
metadata = msg.get("metadata") |
|
|
if metadata: |
|
|
chat_msg["metadata"] = metadata |
|
|
options = msg.get("options") |
|
|
if options: |
|
|
chat_msg["options"] = options |
|
|
chat_messages.append(chat_msg) |
|
|
return chat_messages |
|
|
|
|
|
def get_advisor_chat_messages(self) -> list[dict]: |
|
|
"""Public helper for UI components that expect message dictionaries.""" |
|
|
with self._lock: |
|
|
return self._get_advisor_chat_messages_internal() |
|
|
|
|
|
def _get_event_log_internal(self, limit: int = 15) -> str: |
|
|
"""Internal method to get event log (must be called with lock held).""" |
|
|
lines = [] |
|
|
event_logs = [log for log in self._logs if log.event_type != "advisor"] |
|
|
for log in event_logs[-limit:]: |
|
|
if log.event_type == "deploy": |
|
|
lines.append(f"[Tick {log.tick}] π {log.message}") |
|
|
elif log.event_type == "status": |
|
|
lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
|
|
elif log.event_type == "error": |
|
|
lines.append(f"[{log.timestamp}] β οΈ {log.message}") |
|
|
else: |
|
|
lines.append(f"[Tick {log.tick}] {log.message}") |
|
|
return "\n".join(lines) if lines else "No events yet..." |
|
|
|
|
|
def get_event_log_text(self, limit: int = 15) -> str: |
|
|
"""Get event logs (deploy, status, error) without AI advisor.""" |
|
|
with self._lock: |
|
|
lines = [] |
|
|
event_logs = [log for log in self._logs if log.event_type != "advisor"] |
|
|
for log in event_logs[-limit:]: |
|
|
if log.event_type == "deploy": |
|
|
lines.append(f"[Tick {log.tick}] π {log.message}") |
|
|
elif log.event_type == "status": |
|
|
lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
|
|
elif log.event_type == "error": |
|
|
lines.append(f"[{log.timestamp}] β οΈ {log.message}") |
|
|
else: |
|
|
lines.append(f"[Tick {log.tick}] {log.message}") |
|
|
return "\n".join(lines) if lines else "No events yet..." |
|
|
|
|
|
def get_deploy_log_text(self, limit: int = 10) -> str: |
|
|
"""Get deploy-related logs only (deploy success and errors).""" |
|
|
with self._lock: |
|
|
lines = [] |
|
|
deploy_logs = [log for log in self._logs if log.event_type in ["deploy", "error"]] |
|
|
for log in deploy_logs[-limit:]: |
|
|
if log.event_type == "deploy": |
|
|
lines.append(f"[Tick {log.tick}] β
{log.message}") |
|
|
elif log.event_type == "error": |
|
|
lines.append(f"[Tick {log.tick}] β {log.message}") |
|
|
return "\n".join(lines) if lines else "Click on a cell to deploy units..." |
|
|
|
|
|
def deploy_unit(self, unit_type: str, x: int, y: int, source: str = "player") -> dict: |
|
|
"""Deploy a unit (thread-safe).""" |
|
|
with self._lock: |
|
|
result = self._call_mcp_tool("deploy_unit", unit_type=unit_type, x=x, y=y, source=source) |
|
|
|
|
|
if result.get("status") == "ok": |
|
|
self._add_log( |
|
|
"deploy", |
|
|
f"Deployed {unit_type} at ({x}, {y})", |
|
|
{"unit": result.get("unit"), "source": source} |
|
|
) |
|
|
if str(source or "").startswith("player"): |
|
|
unit_label = "fire truck" if unit_type == "fire_truck" else "helicopter" |
|
|
self._record_player_action( |
|
|
"deploy_unit", |
|
|
f"Deployed {unit_label} at ({x}, {y})", |
|
|
{"unit_type": unit_type, "x": x, "y": y} |
|
|
) |
|
|
else: |
|
|
self._add_log( |
|
|
"error", |
|
|
f"Failed to deploy {unit_type}: {result.get('message')}" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
def remove_unit(self, x: int, y: int) -> dict: |
|
|
"""Remove a unit at position (thread-safe).""" |
|
|
with self._lock: |
|
|
result = self._call_mcp_tool("remove_unit", x=x, y=y) |
|
|
|
|
|
if result.get("status") == "ok": |
|
|
self._add_log( |
|
|
"deploy", |
|
|
f"Removed unit at ({x}, {y})", |
|
|
{"unit": result.get("unit")} |
|
|
) |
|
|
removed_unit = result.get("unit") or { |
|
|
"type": result.get("removed_unit_type"), |
|
|
**(result.get("position") or {"x": x, "y": y}), |
|
|
} |
|
|
unit_type = removed_unit.get("type") or result.get("removed_unit_type", "") |
|
|
unit_label = ( |
|
|
"fire truck" |
|
|
if unit_type == "fire_truck" |
|
|
else "helicopter" |
|
|
if unit_type == "helicopter" |
|
|
else "unit" |
|
|
) |
|
|
self._record_player_action( |
|
|
"remove_unit", |
|
|
f"Removed {unit_label} at ({removed_unit.get('x', x)}, {removed_unit.get('y', y)})", |
|
|
{"unit": removed_unit} |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
def add_fire(self, x: int, y: int, intensity: float = 0.5) -> dict: |
|
|
"""Add fire at position (thread-safe). For testing purposes.""" |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
return {"status": "error", "message": "World not initialized"} |
|
|
|
|
|
|
|
|
if not (0 <= x < self.engine.world.width and 0 <= y < self.engine.world.height): |
|
|
return {"status": "error", "message": f"Position ({x}, {y}) out of bounds"} |
|
|
|
|
|
cell = self.engine.world.grid[y][x] |
|
|
|
|
|
|
|
|
if cell.cell_type not in (CellType.FOREST, CellType.BUILDING): |
|
|
return {"status": "error", "message": "Fire can only be placed on forest or building"} |
|
|
|
|
|
|
|
|
if cell.fire_intensity > 0: |
|
|
return {"status": "error", "message": "Cannot place fire on existing fire or smoke"} |
|
|
|
|
|
|
|
|
for unit in self.engine.world.units: |
|
|
if unit.x == x and unit.y == y: |
|
|
return {"status": "error", "message": "Cannot place fire where a unit exists"} |
|
|
|
|
|
|
|
|
old_intensity = cell.fire_intensity |
|
|
cell.fire_intensity = min(1.0, max(0.0, intensity)) |
|
|
|
|
|
|
|
|
self.engine.world.calculate_metrics() |
|
|
|
|
|
self._add_log( |
|
|
"fire", |
|
|
f"π₯ Added fire at ({x}, {y}) with intensity {int(intensity * 100)}%", |
|
|
{"x": x, "y": y, "intensity": cell.fire_intensity, "old_intensity": old_intensity} |
|
|
) |
|
|
result = {"status": "ok", "x": x, "y": y, "intensity": cell.fire_intensity} |
|
|
self._record_player_action( |
|
|
"add_fire", |
|
|
f"Ignited fire at ({x}, {y}) Β· intensity {int(cell.fire_intensity * 100)}%", |
|
|
result |
|
|
) |
|
|
return result |
|
|
|
|
|
def has_unit_at(self, x: int, y: int) -> bool: |
|
|
"""Check if there's a unit at position.""" |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
return False |
|
|
for unit in self.engine.world.units: |
|
|
if unit.x == x and unit.y == y: |
|
|
return True |
|
|
return False |
|
|
|
|
|
def is_running(self) -> bool: |
|
|
"""Check if simulation is running.""" |
|
|
return self._running |
|
|
|
|
|
def set_auto_execute(self, enabled: bool): |
|
|
"""Set whether to automatically execute AI recommendations.""" |
|
|
with self._lock: |
|
|
self._auto_execute = enabled |
|
|
|
|
|
def is_auto_execute(self) -> bool: |
|
|
"""Check if auto-execute is enabled.""" |
|
|
return self._auto_execute |
|
|
|
|
|
def get_advisor_model_choice(self) -> str: |
|
|
"""Return the currently selected advisor model label.""" |
|
|
with self._lock: |
|
|
return self._model_choice |
|
|
|
|
|
def set_advisor_model_choice(self, choice: str) -> dict: |
|
|
""" |
|
|
Switch the advisor backend/model based on UI selection. |
|
|
Returns status dict usable by the UI for feedback. |
|
|
""" |
|
|
preset = ADVISOR_MODEL_CHOICES.get(choice) |
|
|
if not preset: |
|
|
return {"status": "error", "message": "Unknown model selection."} |
|
|
|
|
|
if preset["provider"] == "openai" and not os.getenv("OPENAI_API_KEY"): |
|
|
return { |
|
|
"status": "error", |
|
|
"message": "Please set OPENAI_API_KEY before selecting an OpenAI model.", |
|
|
} |
|
|
|
|
|
with self._lock: |
|
|
self.advisor = AdvisorAgent( |
|
|
provider=preset["provider"], |
|
|
model=preset["model"], |
|
|
) |
|
|
self._model_choice = choice |
|
|
self._add_log("status", f"Advisor model switched to {choice}") |
|
|
|
|
|
return {"status": "ok", "selection": choice} |
|
|
|
|
|
def reset_advisor_model_choice(self) -> str: |
|
|
"""Reset advisor selection back to the default preset.""" |
|
|
default_choice = DEFAULT_ADVISOR_MODEL_CHOICE |
|
|
preset = ADVISOR_MODEL_CHOICES[default_choice] |
|
|
with self._lock: |
|
|
self.advisor = AdvisorAgent( |
|
|
provider=preset["provider"], |
|
|
model=preset["model"], |
|
|
) |
|
|
self._model_choice = default_choice |
|
|
self._advisor_first_run = True |
|
|
self._add_log("status", f"Advisor model reset to {default_choice}") |
|
|
return default_choice |
|
|
|
|
|
def is_thinking(self) -> bool: |
|
|
"""Check if AI advisor is currently thinking.""" |
|
|
return self._is_thinking |
|
|
|
|
|
def get_thinking_stage(self) -> int: |
|
|
"""Get current AI thinking stage (0=idle, 1=tool_call, 2=assess, 3=plan, 4=execute).""" |
|
|
return self._current_stage if self._is_thinking else 0 |
|
|
|
|
|
def _simulation_loop(self): |
|
|
"""Background simulation loop.""" |
|
|
|
|
|
after_action_context = None |
|
|
|
|
|
while self._running: |
|
|
sim_should_stop = False |
|
|
try: |
|
|
with self._lock: |
|
|
if self.engine.world is None: |
|
|
break |
|
|
|
|
|
|
|
|
if self._advisor_first_run: |
|
|
self._advisor_first_run = False |
|
|
self._run_advisor(self._get_mcp_world_state()) |
|
|
|
|
|
|
|
|
self.engine.step() |
|
|
self._tick_count += 1 |
|
|
|
|
|
|
|
|
state = self.engine.get_state() |
|
|
self._record_tick_metrics_locked(state) |
|
|
status = state.get("status", "running") |
|
|
|
|
|
if status in ["success", "fail"]: |
|
|
self._add_log( |
|
|
"status", |
|
|
"π SUCCESS! Fire contained!" if status == "success" else "π₯ FAILED! Too much damage!" |
|
|
) |
|
|
after_action_context = self._prepare_after_action_context_locked(status, state) |
|
|
self._running = False |
|
|
sim_should_stop = True |
|
|
else: |
|
|
|
|
|
if self._tick_count % self.advisor_interval == 0: |
|
|
self._run_advisor(self._get_mcp_world_state()) |
|
|
|
|
|
|
|
|
if self._on_update: |
|
|
try: |
|
|
self._on_update() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if after_action_context: |
|
|
self._launch_after_action_report(after_action_context) |
|
|
after_action_context = None |
|
|
|
|
|
if sim_should_stop: |
|
|
break |
|
|
|
|
|
|
|
|
time.sleep(self.tick_interval) |
|
|
|
|
|
except Exception as e: |
|
|
with self._lock: |
|
|
self._add_log("error", f"Simulation error: {str(e)}") |
|
|
break |
|
|
|
|
|
def _archive_current_cycle_locked(self): |
|
|
"""Move the completed cycle messages into history (caller must hold _lock).""" |
|
|
if not self._current_cycle_messages: |
|
|
return |
|
|
self._advisor_history.extend(self._current_cycle_messages) |
|
|
if len(self._advisor_history) > 42: |
|
|
self._advisor_history = self._advisor_history[-39:] |
|
|
self._current_cycle_messages = [] |
|
|
|
|
|
def _run_advisor(self, state: dict): |
|
|
""" |
|
|
Run advisor analysis with progressive stage display. |
|
|
Each stage is shown one at a time: Assessment β Planning β Execution β Summary. |
|
|
""" |
|
|
|
|
|
if self._advisor_running: |
|
|
return |
|
|
|
|
|
self._advisor_running = True |
|
|
tick = state.get("tick", 0) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._archive_current_cycle_locked() |
|
|
|
|
|
|
|
|
self._current_cycle_messages = [] |
|
|
self._thinking_start_tick = tick |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._current_stage = 1 |
|
|
self._is_thinking = True |
|
|
|
|
|
self._lock.release() |
|
|
try: |
|
|
assessment = self.advisor.assess(state) |
|
|
finally: |
|
|
self._lock.acquire() |
|
|
|
|
|
|
|
|
self._add_assessment_message(assessment, state, tick) |
|
|
self._is_thinking = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._current_stage = 2 |
|
|
self._is_thinking = True |
|
|
|
|
|
self._lock.release() |
|
|
try: |
|
|
plan = self.advisor.plan(state, assessment) |
|
|
finally: |
|
|
self._lock.acquire() |
|
|
|
|
|
self._add_planning_message(plan, tick) |
|
|
self._is_thinking = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._current_stage = 3 |
|
|
self._is_thinking = True |
|
|
|
|
|
self._lock.release() |
|
|
try: |
|
|
recommendations = self.advisor.execute(state, assessment, plan) |
|
|
finally: |
|
|
self._lock.acquire() |
|
|
|
|
|
|
|
|
self._add_execution_message(recommendations, tick) |
|
|
|
|
|
|
|
|
response = self._build_advisor_response(assessment, plan, recommendations) |
|
|
self._latest_recommendations = response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._current_stage = 4 |
|
|
self._is_thinking = True |
|
|
|
|
|
self._lock.release() |
|
|
try: |
|
|
cycle_summary = self.advisor.summarize(state, assessment, plan, recommendations, response) |
|
|
finally: |
|
|
self._lock.acquire() |
|
|
|
|
|
self._is_thinking = False |
|
|
self._add_summary_cycle_message(cycle_summary, tick) |
|
|
self._record_cycle_summary(tick, cycle_summary, self.engine.get_state()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._current_stage = 5 |
|
|
|
|
|
self._add_log( |
|
|
"advisor", |
|
|
response.summary, |
|
|
{ |
|
|
"recommendations": [r.to_dict() for r in response.recommendations], |
|
|
"thinking": response.thinking, |
|
|
"analysis": response.analysis, |
|
|
"priority": response.priority, |
|
|
"error": response.error |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if self._auto_execute and response.recommendations: |
|
|
self._execute_recommendations(response, tick) |
|
|
|
|
|
except Exception as e: |
|
|
self._is_thinking = False |
|
|
self._current_stage = 0 |
|
|
self._add_log("error", f"Advisor error: {str(e)}") |
|
|
|
|
|
self._current_cycle_messages.append({ |
|
|
"role": "assistant", |
|
|
"content": f"β AI Advisor Error: {str(e)}" |
|
|
}) |
|
|
finally: |
|
|
self._advisor_running = False |
|
|
|
|
|
self._archive_current_cycle_locked() |
|
|
|
|
|
def _add_assessment_message(self, assessment: AssessmentResult, state: dict, tick: int): |
|
|
"""Add the Assessment message (Stage 1) with MCP tool calls to current cycle.""" |
|
|
|
|
|
fires = state.get("fires", []) |
|
|
units = state.get("units", []) |
|
|
buildings = state.get("buildings", []) |
|
|
building_integrity = state.get("building_integrity", 1.0) |
|
|
status = state.get("status", "running") |
|
|
width = state.get("width", 10) |
|
|
height = state.get("height", 10) |
|
|
|
|
|
|
|
|
emoji_map = generate_emoji_map(self.engine) |
|
|
|
|
|
priority_emoji = { |
|
|
"CRITICAL": "π΄", |
|
|
"HIGH": "π ", |
|
|
"MODERATE": "π‘", |
|
|
"LOW": "π’" |
|
|
} |
|
|
emoji = priority_emoji.get(assessment.threat_level, "βͺ") |
|
|
|
|
|
content = f""" |
|
|
### π Stage 1 Β· Assessment `[Tick {tick}]` |
|
|
|
|
|
#### π§ MCP Tool Calls |
|
|
|
|
|
<details> |
|
|
<summary>π€ <code>mcp.get_world_state()</code></summary> |
|
|
|
|
|
```python |
|
|
result = mcp.get_world_state() |
|
|
``` |
|
|
|
|
|
**Response** |
|
|
``` |
|
|
status: {status} | grid: {width}x{height} |
|
|
fires: {len(fires)} | units: {len(units)}/{state.get('max_units', 10)} |
|
|
buildings: {len(buildings)} | integrity: {building_integrity:.0%} |
|
|
``` |
|
|
|
|
|
``` |
|
|
{emoji_map} |
|
|
``` |
|
|
|
|
|
_Legend: π² Forest Β· π’ Building Β· π₯ Fire Β· π¨ Smoke Β· π Truck Β· π Heli_ |
|
|
</details> |
|
|
""".strip() |
|
|
|
|
|
if assessment.ineffective_units: |
|
|
idle_lines = [] |
|
|
for u in assessment.ineffective_units[:5]: |
|
|
idle_lines.append(f"- {u.get('type', 'unit')} at ({u.get('x', 0)}, {u.get('y', 0)})") |
|
|
if len(assessment.ineffective_units) > 5: |
|
|
idle_lines.append(f"- ... and {len(assessment.ineffective_units) - 5} more") |
|
|
idle_block = "\n".join(idle_lines) |
|
|
content += f""" |
|
|
|
|
|
<details> |
|
|
<summary>π€ <code>mcp.find_idle_units()</code> β β οΈ {len(assessment.ineffective_units)} idle</summary> |
|
|
|
|
|
```python |
|
|
{idle_block} |
|
|
``` |
|
|
</details> |
|
|
""" |
|
|
else: |
|
|
content += """ |
|
|
|
|
|
<details> |
|
|
<summary>π€ <code>mcp.find_idle_units()</code> β β
All effective</summary> |
|
|
|
|
|
```python |
|
|
# Every deployed unit is actively covering a fire |
|
|
``` |
|
|
</details> |
|
|
""" |
|
|
|
|
|
if assessment.uncovered_fires: |
|
|
fire_lines = [] |
|
|
for f in assessment.uncovered_fires[:5]: |
|
|
fire_lines.append(f"- Fire at ({f.get('x', 0)}, {f.get('y', 0)}) Β· intensity={f.get('intensity', 0):.0%}") |
|
|
if len(assessment.uncovered_fires) > 5: |
|
|
fire_lines.append(f"- ... and {len(assessment.uncovered_fires) - 5} more") |
|
|
fire_block = "\n".join(fire_lines) |
|
|
content += f""" |
|
|
|
|
|
<details> |
|
|
<summary>π€ <code>mcp.find_uncovered_fires()</code> β π¨ {len(assessment.uncovered_fires)} uncovered</summary> |
|
|
|
|
|
```python |
|
|
{fire_block} |
|
|
``` |
|
|
</details> |
|
|
""" |
|
|
else: |
|
|
content += """ |
|
|
|
|
|
<details> |
|
|
<summary>π€ <code>mcp.find_uncovered_fires()</code> β β
All covered</summary> |
|
|
|
|
|
```python |
|
|
# All active fires currently have unit coverage |
|
|
``` |
|
|
</details> |
|
|
""" |
|
|
|
|
|
content += f""" |
|
|
#### π Analysis Results |
|
|
|
|
|
**Threat Level:** {emoji} {assessment.threat_level} |
|
|
|
|
|
**Fire Analysis** |
|
|
- Total fires: {assessment.fire_count} |
|
|
- High intensity (>70%): {len(assessment.high_intensity_fires)} |
|
|
- Building threats: {len(assessment.building_threats)} |
|
|
- β οΈ Uncovered fires: {len(assessment.uncovered_fires)} |
|
|
|
|
|
**Unit Analysis** |
|
|
- Deployed: {assessment.unit_count}/{assessment.max_units} |
|
|
- Effective: {len(assessment.effective_units)} |
|
|
- Idle: {len(assessment.ineffective_units)} |
|
|
- Coverage ratio: {assessment.coverage_ratio:.0%} |
|
|
|
|
|
**Summary:** {assessment.summary} |
|
|
""" |
|
|
content += "\n\n---\n" |
|
|
self._current_cycle_messages.append({"role": "assistant", "content": content}) |
|
|
|
|
|
def _add_planning_message(self, plan, tick: int): |
|
|
"""Add the Planning message (Stage 2) to current cycle.""" |
|
|
strategy_emoji = { |
|
|
"deploy_new": "π", |
|
|
"optimize_existing": "π", |
|
|
"balanced": "βοΈ", |
|
|
"monitor": "π" |
|
|
} |
|
|
s_emoji = strategy_emoji.get(plan.strategy, "π") |
|
|
|
|
|
action_lines = [] |
|
|
if plan.deploy_count > 0: |
|
|
action_lines.append(f"- Deploy: {plan.deploy_count} new unit(s)") |
|
|
if plan.reposition_units: |
|
|
action_lines.append(f"- Reposition: {len(plan.reposition_units)} idle unit(s)") |
|
|
if plan.priority_targets: |
|
|
action_lines.append(f"- Priority fires: {len(plan.priority_targets)}") |
|
|
|
|
|
content = f""" |
|
|
### π― Stage 2 Β· Planning `[Tick {tick}]` |
|
|
|
|
|
**Strategy:** {s_emoji} `{plan.strategy.upper()}` |
|
|
|
|
|
**Reasoning** |
|
|
{plan.reasoning} |
|
|
""".strip() |
|
|
|
|
|
if action_lines: |
|
|
content += "\n\n**Action Outline**\n" + "\n".join(action_lines) |
|
|
|
|
|
content += "\n\n---\n" |
|
|
|
|
|
self._current_cycle_messages.append({"role": "assistant", "content": content}) |
|
|
|
|
|
def _add_execution_message(self, recommendations, tick: int): |
|
|
"""Add the Execution message (Stage 3) with MCP tool calls to current cycle.""" |
|
|
if not recommendations: |
|
|
content = f"### β‘ Stage 3 Β· Execution `[Tick {tick}]`\n\nβ
**No actions required.** Current deployments already cover every fire." |
|
|
content += "\n\n---\n" |
|
|
self._current_cycle_messages.append({"role": "assistant", "content": content}) |
|
|
self._record_action_breakdown(tick, 0, 0, 0) |
|
|
return |
|
|
|
|
|
move_count = sum(1 for r in recommendations if getattr(r, "action", "deploy") == "move") |
|
|
replace_count = sum(1 for r in recommendations if getattr(r, "action", "deploy") == "replace") |
|
|
deploy_count = len(recommendations) - move_count - replace_count |
|
|
|
|
|
summary = [] |
|
|
if move_count: |
|
|
summary.append(f"- π Reposition {move_count} idle unit(s)") |
|
|
if replace_count: |
|
|
summary.append(f"- π Replace {replace_count} unit(s)") |
|
|
if deploy_count: |
|
|
summary.append(f"- π Deploy {deploy_count} additional unit(s)") |
|
|
|
|
|
content = f"### β‘ Stage 3 Β· Execution `[Tick {tick}]`\n\n" + "\n".join(summary) + "\n\n#### π§ MCP Tool Actions\n" |
|
|
|
|
|
for idx, rec in enumerate(recommendations, 1): |
|
|
unit_emoji = "π" if rec.suggested_unit_type == "fire_truck" else "π" |
|
|
unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
|
|
action = getattr(rec, "action", "deploy") |
|
|
|
|
|
if action == "move": |
|
|
source_x = getattr(rec, "source_x", 0) |
|
|
source_y = getattr(rec, "source_y", 0) |
|
|
block = f""" |
|
|
<details> |
|
|
<summary>{idx}. π Move {unit_name} from ({source_x}, {source_y}) β ({rec.target_x}, {rec.target_y})</summary> |
|
|
|
|
|
```python |
|
|
mcp.move_unit( |
|
|
source_x={source_x}, source_y={source_y}, |
|
|
target_x={rec.target_x}, target_y={rec.target_y} |
|
|
) |
|
|
``` |
|
|
|
|
|
π‘ _{rec.reason}_ |
|
|
</details> |
|
|
""" |
|
|
elif action == "replace": |
|
|
old_type = getattr(rec, "old_unit_type", "fire_truck") |
|
|
old_name = "Fire Truck" if old_type == "fire_truck" else "Helicopter" |
|
|
old_emoji = "π" if old_type == "fire_truck" else "π" |
|
|
block = f""" |
|
|
<details> |
|
|
<summary>{idx}. π Replace {old_name} {old_emoji} with {unit_name} {unit_emoji} at ({rec.target_x}, {rec.target_y})</summary> |
|
|
|
|
|
```python |
|
|
mcp.replace_unit( |
|
|
x={rec.target_x}, y={rec.target_y}, |
|
|
new_unit_type="{rec.suggested_unit_type}" |
|
|
) |
|
|
``` |
|
|
|
|
|
π‘ _{rec.reason}_ |
|
|
</details> |
|
|
""" |
|
|
else: |
|
|
block = f""" |
|
|
<details> |
|
|
<summary>{idx}. {unit_emoji} Deploy {unit_name} to ({rec.target_x}, {rec.target_y})</summary> |
|
|
|
|
|
```python |
|
|
mcp.deploy_unit( |
|
|
unit_type="{rec.suggested_unit_type}", |
|
|
x={rec.target_x}, y={rec.target_y} |
|
|
) |
|
|
``` |
|
|
|
|
|
π‘ _{rec.reason}_ |
|
|
</details> |
|
|
""" |
|
|
content += "\n" + block.strip() + "\n" |
|
|
|
|
|
content += "\n\n---\n" |
|
|
|
|
|
self._current_cycle_messages.append({"role": "assistant", "content": content}) |
|
|
self._record_action_breakdown(tick, deploy_count, move_count, replace_count) |
|
|
|
|
|
def _add_summary_cycle_message(self, cycle_summary: CycleSummary, tick: int): |
|
|
"""Add Stage 4 summary message to the current cycle.""" |
|
|
highlights = "\n".join(f"- {item}" for item in cycle_summary.key_highlights) or "- (none)" |
|
|
risks = "\n".join(f"- {item}" for item in cycle_summary.risks) or "- (none)" |
|
|
next_focus = "\n".join(f"- {item}" for item in cycle_summary.next_focus) or "- (none)" |
|
|
content = f""" |
|
|
### π§ Stage 4 Β· Summary `[Tick {tick}]` |
|
|
|
|
|
**Headline:** {cycle_summary.headline} |
|
|
|
|
|
**Threat Level:** {cycle_summary.threat_level} |
|
|
|
|
|
**Key Highlights** |
|
|
{highlights} |
|
|
|
|
|
**Risks / Gaps** |
|
|
{risks} |
|
|
|
|
|
**Next Focus** |
|
|
{next_focus} |
|
|
""".strip() |
|
|
content += "\n\n---\n" |
|
|
self._current_cycle_messages.append({"role": "assistant", "content": content}) |
|
|
|
|
|
def _build_advisor_response(self, assessment: AssessmentResult, plan: PlanResult, recommendations: list) -> AdvisorResponse: |
|
|
"""Build the final AdvisorResponse object.""" |
|
|
|
|
|
|
|
|
thinking_parts = [ |
|
|
f"π Scanning {assessment.fire_count} active fires...", |
|
|
] |
|
|
if assessment.uncovered_fires: |
|
|
thinking_parts.append(f"π¨ ALERT: {len(assessment.uncovered_fires)} fire(s) with NO coverage!") |
|
|
if assessment.building_threats: |
|
|
thinking_parts.append(f"π’ {len(assessment.building_threats)} fire(s) threatening buildings!") |
|
|
if assessment.ineffective_units: |
|
|
thinking_parts.append(f"π {len(assessment.ineffective_units)} idle unit(s) should be repositioned") |
|
|
thinking_parts.append(f"π― Strategy: {plan.strategy.upper()} - {plan.reasoning}") |
|
|
|
|
|
|
|
|
priority_emoji = {"CRITICAL": "π΄", "HIGH": "π ", "MODERATE": "π‘", "LOW": "π’"} |
|
|
emoji = priority_emoji.get(assessment.threat_level, "βͺ") |
|
|
|
|
|
if assessment.threat_level == "CRITICAL": |
|
|
summary = f"{emoji} CRITICAL: {assessment.summary}. Immediate action required!" |
|
|
elif assessment.threat_level == "HIGH": |
|
|
summary = f"{emoji} HIGH: {assessment.summary}. Rapid response needed." |
|
|
elif assessment.threat_level == "MODERATE": |
|
|
summary = f"{emoji} MODERATE: {assessment.summary}. Tactical deployment advised." |
|
|
else: |
|
|
summary = f"{emoji} LOW: {assessment.summary}. Monitoring situation." |
|
|
|
|
|
return AdvisorResponse( |
|
|
summary=summary, |
|
|
recommendations=recommendations, |
|
|
thinking="\n".join(thinking_parts), |
|
|
analysis=f"{assessment.fire_count} fires | {assessment.unit_count}/{assessment.max_units} units | {assessment.building_integrity:.0%} building integrity", |
|
|
priority=assessment.threat_level, |
|
|
assessment=assessment, |
|
|
plan=plan |
|
|
) |
|
|
|
|
|
def _execute_recommendations(self, response: AdvisorResponse, tick: int): |
|
|
"""Execute AI recommendations (must be called with lock held).""" |
|
|
executed_count = 0 |
|
|
for rec in response.recommendations: |
|
|
action = getattr(rec, "action", "deploy") |
|
|
rec_key = f"{tick}_{action}_{rec.suggested_unit_type}_{rec.target_x}_{rec.target_y}" |
|
|
|
|
|
if rec_key in self._executed_recommendations: |
|
|
continue |
|
|
|
|
|
if action == "move": |
|
|
source_x = getattr(rec, "source_x", -1) |
|
|
source_y = getattr(rec, "source_y", -1) |
|
|
if source_x < 0 or source_y < 0: |
|
|
self._add_log("error", f"π€ AI move failed: missing source ({source_x},{source_y})") |
|
|
continue |
|
|
result = self._call_mcp_tool( |
|
|
"move_unit", |
|
|
source_x=source_x, |
|
|
source_y=source_y, |
|
|
target_x=rec.target_x, |
|
|
target_y=rec.target_y, |
|
|
) |
|
|
if result.get("status") == "ok": |
|
|
executed_count += 1 |
|
|
unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
|
|
self._add_log( |
|
|
"deploy", |
|
|
f"π€ AI moved {unit_name}: ({source_x},{source_y}) β ({rec.target_x},{rec.target_y})", |
|
|
{"source": "ai", "reason": rec.reason, "action": "move"}, |
|
|
) |
|
|
self._executed_recommendations.add(rec_key) |
|
|
else: |
|
|
self._add_log( |
|
|
"error", |
|
|
f"π€ AI move failed: {result.get('message', 'unknown error')} " |
|
|
f"(source=({source_x},{source_y}) target=({rec.target_x},{rec.target_y}))", |
|
|
) |
|
|
elif action == "remove": |
|
|
result = self._call_mcp_tool("remove_unit", x=rec.target_x, y=rec.target_y) |
|
|
if result.get("status") == "ok": |
|
|
executed_count += 1 |
|
|
unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
|
|
self._add_log( |
|
|
"deploy", |
|
|
f"π€ AI removed {unit_name} at ({rec.target_x},{rec.target_y}) - ready to redeploy", |
|
|
{"source": "ai", "reason": rec.reason, "action": "remove"}, |
|
|
) |
|
|
self._executed_recommendations.add(rec_key) |
|
|
else: |
|
|
self._add_log( |
|
|
"error", |
|
|
f"π€ AI remove failed: {result.get('message', 'unknown error')} at " |
|
|
f"({rec.target_x},{rec.target_y})", |
|
|
) |
|
|
else: |
|
|
result = self._call_mcp_tool( |
|
|
"deploy_unit", |
|
|
unit_type=rec.suggested_unit_type, |
|
|
x=rec.target_x, |
|
|
y=rec.target_y, |
|
|
source="ai", |
|
|
) |
|
|
if result.get("status") == "ok": |
|
|
executed_count += 1 |
|
|
unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
|
|
self._add_log( |
|
|
"deploy", |
|
|
f"π€ AI deployed {unit_name} at ({rec.target_x}, {rec.target_y})", |
|
|
{"source": "ai", "reason": rec.reason, "action": "deploy"}, |
|
|
) |
|
|
self._executed_recommendations.add(rec_key) |
|
|
else: |
|
|
self._add_log( |
|
|
"error", |
|
|
f"π€ AI deploy failed: {result.get('message', 'unknown error')} " |
|
|
f"at ({rec.target_x}, {rec.target_y})", |
|
|
) |
|
|
|
|
|
if len(self._executed_recommendations) > 100: |
|
|
self._executed_recommendations = set(list(self._executed_recommendations)[-50:]) |
|
|
|
|
|
def _add_log(self, event_type: str, message: str, details: Optional[dict] = None): |
|
|
"""Add a log entry (must be called with lock held).""" |
|
|
tick = self.engine.world.tick if self.engine.world else 0 |
|
|
|
|
|
self._logs.append(LogEntry( |
|
|
timestamp=datetime.now().strftime("%H:%M:%S"), |
|
|
tick=tick, |
|
|
event_type=event_type, |
|
|
message=message, |
|
|
details=details |
|
|
)) |
|
|
|
|
|
|
|
|
if len(self._logs) > 200: |
|
|
self._logs = self._logs[-100:] |
|
|
|
|
|
|
|
|
|
|
|
_service: Optional[SimulationService] = None |
|
|
|
|
|
|
|
|
def get_service() -> SimulationService: |
|
|
"""Get or create the global simulation service.""" |
|
|
global _service |
|
|
if _service is None: |
|
|
_service = SimulationService() |
|
|
return _service |
|
|
|
|
|
|