| | """ |
| | Fire-Rescue MCP - Simulation Service |
| | |
| | Background service that manages simulation loop and LLM advisor evaluations. |
| | Designed for integration with Gradio and HTTP API endpoints. |
| | """ |
| |
|
| | import asyncio |
| | import concurrent.futures |
| | import html |
| | import json |
| | import os |
| | import threading |
| | import time |
| | from dataclasses import dataclass, field |
| | from datetime import datetime |
| | from typing import Any, Callable, Optional |
| |
|
| | from agent import ( |
| | AdvisorAgent, |
| | AdvisorResponse, |
| | AfterActionReport, |
| | AssessmentResult, |
| | PlanResult, |
| | CycleSummary, |
| | ) |
| | from config import range_text |
| | from fire_rescue_mcp.mcp_client import LocalFastMCPClient |
| | from fire_rescue_mcp.mcp_server import attach_engine, detach_engine, mcp as fastmcp_server |
| | from models import SimulationStatus, CellType |
| | from simulation import SimulationEngine |
| |
|
| | FIRE_COUNT_RANGE_TEXT = range_text("fire_count") |
| | BUILDING_COUNT_RANGE_TEXT = range_text("building_count") |
| |
|
| |
|
| | ADVISOR_MODEL_CHOICES = { |
| | "GPT-OSS Β· HuggingFace (openai/gpt-oss-120b)": { |
| | "provider": "hf", |
| | "model": "openai/gpt-oss-120b", |
| | "description": "Default OSS advisor routed through HuggingFace Inference", |
| | }, |
| | "GPT-OSS-20B Β· HuggingFace (openai/gpt-oss-20b)": { |
| | "provider": "hf", |
| | "model": "openai/gpt-oss-20b", |
| | "description": "OpenAI GPT-OSS 20B model via HuggingFace Inference", |
| | }, |
| | "Llama-3.1 Β· HuggingFace (meta-llama/Llama-3.1-8B-Instruct)": { |
| | "provider": "hf", |
| | "model": "meta-llama/Llama-3.1-8B-Instruct", |
| | "description": "Meta Llama-3.1 8B Instruct model via HuggingFace Inference", |
| | }, |
| | "OpenAI Β· gpt-5.1": { |
| | "provider": "openai", |
| | "model": "gpt-5.1", |
| | "description": "Flagship GPT-5.1 via native OpenAI API", |
| | }, |
| | } |
| |
|
| | DEFAULT_ADVISOR_MODEL_CHOICE = "GPT-OSS Β· HuggingFace (openai/gpt-oss-120b)" |
| |
|
| |
|
| | def generate_emoji_map(engine: SimulationEngine) -> str: |
| | """ |
| | Generate an emoji-based visualization of the current world state. |
| | Matches Gradio UI: π²Forest π’Building π₯Fire π¨Smoke πTruck πHeli |
| | """ |
| | if engine.world is None: |
| | return "No map available" |
| | |
| | world = engine.world |
| | |
| | |
| | unit_positions = {} |
| | for unit in world.units: |
| | key = (unit.x, unit.y) |
| | if key not in unit_positions: |
| | unit_positions[key] = [] |
| | unit_positions[key].append(unit.unit_type.value) |
| | |
| | |
| | lines = [] |
| | |
| | |
| | header = " " + "".join(f"{x:2}" for x in range(world.width)) |
| | lines.append(header) |
| | |
| | for y in range(world.height): |
| | row_chars = [] |
| | for x in range(world.width): |
| | cell = world.grid[y][x] |
| | pos = (x, y) |
| | |
| | |
| | if pos in unit_positions: |
| | if "fire_truck" in unit_positions[pos]: |
| | row_chars.append("π") |
| | else: |
| | row_chars.append("π") |
| | elif cell.fire_intensity > 0: |
| | if cell.fire_intensity >= 0.1: |
| | row_chars.append("π₯") |
| | else: |
| | row_chars.append("π¨") |
| | else: |
| | if cell.cell_type == CellType.BUILDING: |
| | row_chars.append("π’") |
| | elif cell.cell_type == CellType.FOREST: |
| | row_chars.append("π²") |
| | else: |
| | row_chars.append("β¬") |
| | |
| | lines.append(f"{y:2} " + "".join(row_chars)) |
| | |
| | return "\n".join(lines) |
| |
|
| |
|
| | @dataclass |
| | class LogEntry: |
| | """A single log entry for the simulation.""" |
| | timestamp: str |
| | tick: int |
| | event_type: str |
| | message: str |
| | details: Optional[dict] = None |
| | |
| | def to_dict(self) -> dict: |
| | return { |
| | "timestamp": self.timestamp, |
| | "tick": self.tick, |
| | "event_type": self.event_type, |
| | "message": self.message, |
| | "details": self.details |
| | } |
| |
|
| |
|
| | @dataclass |
| | class SimulationService: |
| | """ |
| | Service that manages the simulation lifecycle and LLM advisor. |
| | |
| | Provides: |
| | - Background simulation loop |
| | - Periodic LLM advisor evaluations |
| | - Thread-safe state access |
| | - Event logging |
| | """ |
| | |
| | |
| | tick_interval: float = 1.0 |
| | advisor_interval: int = 10 |
| | |
| | |
| | engine: SimulationEngine = field(default_factory=SimulationEngine) |
| | advisor: AdvisorAgent = field(default_factory=AdvisorAgent) |
| | |
| | |
| | _running: bool = False |
| | _thread: Optional[threading.Thread] = None |
| | _lock: threading.RLock = field(default_factory=threading.RLock) |
| | |
| | |
| | _logs: list[LogEntry] = field(default_factory=list) |
| | _latest_recommendations: Optional[AdvisorResponse] = None |
| | _on_update: Optional[Callable] = None |
| | |
| | |
| | _is_thinking: bool = False |
| | _thinking_start_tick: int = 0 |
| | _current_stage: int = 0 |
| | _current_cycle_messages: list = field(default_factory=list) |
| | |
| | |
| | _advisor_history: list[dict] = field(default_factory=list) |
| | _cycle_summaries: list[dict] = field(default_factory=list) |
| | _metrics_history: list[dict] = field(default_factory=list) |
| | _threat_history: list[dict] = field(default_factory=list) |
| | _action_history: list[dict] = field(default_factory=list) |
| | _player_actions: list[dict] = field(default_factory=list) |
| |
|
| | |
| | _mcp_client: LocalFastMCPClient | None = None |
| | _mcp_call_log: list[dict] = field(default_factory=list) |
| | _mcp_log_dirty: bool = False |
| | _last_mcp_log: str = "" |
| | |
| | |
| | _advisor_running: bool = False |
| | advisor_timeout: float = 30.0 |
| | advisor_max_retries: int = 3 |
| | |
| | |
| | _result_shown: bool = False |
| | _result_dismissed: bool = False |
| | _result_report: Optional[AfterActionReport] = None |
| | _result_report_status: str = "idle" |
| | _result_report_error: str = "" |
| | _last_result_signature: str = "" |
| | _last_result_payload_signature: str = "" |
| | |
| | |
| | _auto_execute: bool = True |
| | _executed_recommendations: set = field(default_factory=set) |
| | _session_owner_id: str = "" |
| | _attached_session_id: str = "" |
| | _model_choice: str = DEFAULT_ADVISOR_MODEL_CHOICE |
| | |
| | |
| | _tick_count: int = 0 |
| | _advisor_first_run: bool = True |
| | |
| | |
| | _last_grid_hash: str = "" |
| | _last_advisor_signature: tuple = field(default_factory=tuple) |
| | _last_history_signature: tuple = field(default_factory=tuple) |
| | _last_event_log: str = "" |
| | _last_button_states: tuple = () |
| | _last_result_state: str = "" |
| | |
| | def __post_init__(self): |
| | self._lock = threading.RLock() |
| | self._logs = [] |
| | self._result_shown = False |
| | self._result_dismissed = False |
| | self._reset_after_action_report_locked() |
| | self._is_thinking = False |
| | self._current_stage = 0 |
| | self._current_cycle_messages = [] |
| | self._advisor_history = [] |
| | self._cycle_summaries = [] |
| | self._metrics_history = [] |
| | self._threat_history = [] |
| | self._action_history = [] |
| | self._player_actions = [] |
| | self._advisor_running = False |
| | self._auto_execute = True |
| | self._executed_recommendations = set() |
| | self._tick_count = 0 |
| | self._advisor_first_run = True |
| | self._session_owner_id = "" |
| | |
| | self._last_grid_hash = "" |
| | self._last_advisor_signature = () |
| | self._last_history_signature = () |
| | self._last_event_log = "" |
| | self._last_button_states = (True, False) |
| | self._last_result_state = "" |
| | self._mcp_client = LocalFastMCPClient(fastmcp_server, self._record_mcp_call) |
| | self._mcp_call_log = [] |
| | self._mcp_log_dirty = False |
| | self._last_mcp_log = "" |
| | |
| | def start( |
| | self, |
| | seed: Optional[int] = None, |
| | fire_count: int = 4, |
| | fire_intensity: float = 0.6, |
| | building_count: int = 16, |
| | max_units: int = 10, |
| | session_id: Optional[str] = None, |
| | on_update: Optional[Callable] = None |
| | ) -> dict: |
| | f""" |
| | Start a new simulation. |
| | |
| | Args: |
| | seed: Random seed for reproducibility |
| | fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}) |
| | fire_intensity: Initial fire intensity (0.0-1.0) |
| | building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}) |
| | on_update: Callback function called on state changes |
| | |
| | Returns: |
| | Initial world state |
| | """ |
| | |
| | thread = None |
| | with self._lock: |
| | if self._running: |
| | self._running = False |
| | thread = self._thread |
| | self._thread = None |
| | |
| | |
| | if thread and thread.is_alive(): |
| | thread.join(timeout=2.0) |
| | |
| | |
| | with self._lock: |
| | |
| | self._logs = [] |
| | self._latest_recommendations = None |
| | self._advisor_history = [] |
| | self._cycle_summaries = [] |
| | self._current_cycle_messages = [] |
| | self._current_stage = 0 |
| | self._is_thinking = False |
| | self._result_shown = False |
| | self._result_dismissed = False |
| | self._reset_after_action_report_locked() |
| | self._executed_recommendations = set() |
| | self._tick_count = 0 |
| | self._advisor_first_run = True |
| | self._on_update = on_update |
| | self._metrics_history = [] |
| | self._threat_history = [] |
| | self._action_history = [] |
| | self._player_actions = [] |
| | self._session_owner_id = session_id or "" |
| | |
| | self._last_grid_hash = "" |
| | self._last_advisor_signature = () |
| | self._last_history_signature = () |
| | self._last_event_log = "" |
| | self._last_button_states = (True, False) |
| | self._last_result_state = "" |
| | |
| | |
| | self.engine.reset( |
| | seed=seed, |
| | fire_count=fire_count, |
| | fire_intensity=fire_intensity, |
| | building_count=building_count, |
| | max_units=max_units |
| | ) |
| | self._record_tick_metrics_locked(self.engine.get_state()) |
| | |
| | |
| | self._add_log("status", f"Simulation started: {fire_count} fires, {building_count} buildings, max {max_units} units") |
| | |
| | |
| | self._running = True |
| | self._thread = threading.Thread(target=self._simulation_loop, daemon=True) |
| | self._thread.start() |
| | |
| | return self._compose_state_locked() |
| | |
| | def resume(self, on_update: Optional[Callable] = None) -> dict: |
| | """ |
| | Resume a paused simulation. |
| | |
| | Returns: |
| | Current world state, or error if no paused simulation exists |
| | """ |
| | with self._lock: |
| | |
| | if self.engine.world is None: |
| | return {"status": "error", "message": "No simulation to resume"} |
| | |
| | |
| | if self._running: |
| | return {"status": "error", "message": "Simulation is already running"} |
| | |
| | |
| | current_status = self.engine.world.status |
| | if current_status in [SimulationStatus.SUCCESS, SimulationStatus.FAIL]: |
| | return {"status": "error", "message": f"Simulation has ended ({current_status.value})"} |
| | |
| | self._on_update = on_update |
| | |
| | |
| | self.engine.world.status = SimulationStatus.RUNNING |
| | |
| | |
| | self._add_log("status", "Simulation resumed") |
| | |
| | |
| | self._running = True |
| | self._thread = threading.Thread(target=self._simulation_loop, daemon=True) |
| | self._thread.start() |
| | |
| | return self._compose_state_locked() |
| | |
| | def pause(self) -> dict: |
| | """Pause the simulation (can be resumed later).""" |
| | |
| | with self._lock: |
| | self._running = False |
| | thread = self._thread |
| | self._thread = None |
| | |
| | |
| | if thread and thread.is_alive(): |
| | thread.join(timeout=2.0) |
| | |
| | with self._lock: |
| | self._add_log("status", "Simulation paused") |
| | |
| | |
| | if self.engine.world: |
| | return self._compose_state_locked() |
| | return {"status": "idle", "after_action_report": self._get_after_action_report_payload_locked()} |
| | |
| | def is_paused(self) -> bool: |
| | """Check if simulation is paused (has world but not running).""" |
| | with self._lock: |
| | if self.engine.world is None: |
| | return False |
| | |
| | return ( |
| | not self._running |
| | and self.engine.world.status == SimulationStatus.RUNNING |
| | ) |
| |
|
| | def can_resume_session(self, session_id: Optional[str]) -> bool: |
| | """Check if the provided session owns the paused simulation.""" |
| | if not session_id: |
| | return False |
| | with self._lock: |
| | if ( |
| | not self.engine.world |
| | or self._session_owner_id != session_id |
| | ): |
| | return False |
| | return ( |
| | not self._running |
| | and self.engine.world.status == SimulationStatus.RUNNING |
| | ) |
| | |
| | def _stop_internal(self): |
| | """Internal stop - sets flag only (must be called with lock held).""" |
| | self._running = False |
| | |
| | def reset( |
| | self, |
| | seed: Optional[int] = None, |
| | fire_count: int = 4, |
| | fire_intensity: float = 0.6, |
| | building_count: int = 16, |
| | max_units: int = 10, |
| | session_id: Optional[str] = None, |
| | ) -> dict: |
| | """Reset simulation without starting the loop.""" |
| | |
| | thread = None |
| | with self._lock: |
| | if self._running: |
| | self._running = False |
| | thread = self._thread |
| | self._thread = None |
| | |
| | |
| | if thread and thread.is_alive(): |
| | thread.join(timeout=2.0) |
| | |
| | |
| | with self._lock: |
| | self._logs = [] |
| | self._latest_recommendations = None |
| | self._advisor_history = [] |
| | self._cycle_summaries = [] |
| | self._current_cycle_messages = [] |
| | self._current_stage = 0 |
| | self._is_thinking = False |
| | self._result_shown = False |
| | self._result_dismissed = False |
| | self._reset_after_action_report_locked() |
| | self._executed_recommendations = set() |
| | self._tick_count = 0 |
| | self._advisor_first_run = True |
| | self._metrics_history = [] |
| | self._threat_history = [] |
| | self._action_history = [] |
| | self._session_owner_id = session_id or "" |
| | |
| | self._last_grid_hash = "" |
| | self._last_advisor_signature = () |
| | self._last_history_signature = () |
| | self._last_event_log = "" |
| | self._last_button_states = (True, False) |
| | self._last_result_state = "" |
| | |
| | self.engine.reset( |
| | seed=seed, |
| | fire_count=fire_count, |
| | fire_intensity=fire_intensity, |
| | building_count=building_count, |
| | max_units=max_units |
| | ) |
| | self._record_tick_metrics_locked(self.engine.get_state()) |
| | self._add_log("status", f"Simulation reset: {fire_count} fires, {building_count} buildings, max {max_units} units") |
| | |
| | return self._compose_state_locked() |
| | |
| | def get_state(self) -> dict: |
| | """Get current world state (thread-safe).""" |
| | with self._lock: |
| | if self.engine.world is None: |
| | return { |
| | "status": "idle", |
| | "message": "No simulation running", |
| | "after_action_report": self._get_after_action_report_payload_locked(), |
| | } |
| | return self._compose_state_locked() |
| |
|
| | def _compose_state_locked(self) -> dict: |
| | """Attach after-action report payload to the current engine state.""" |
| | state = self.engine.get_state() |
| | state["after_action_report"] = self._get_after_action_report_payload_locked() |
| | return state |
| |
|
| | def _reset_after_action_report_locked(self): |
| | """Clear cached after-action report data.""" |
| | self._result_report = None |
| | self._result_report_status = "idle" |
| | self._result_report_error = "" |
| | self._last_result_signature = "" |
| | self._last_result_payload_signature = "" |
| |
|
| | def _record_cycle_summary(self, tick: int, cycle_summary: CycleSummary, state_snapshot: Optional[dict] = None): |
| | """Store Stage 4 summary for each advisor cycle, including metrics for charts.""" |
| | state_snapshot = state_snapshot or {} |
| | metrics = { |
| | "tick": tick, |
| | "fires": len(state_snapshot.get("fires", [])), |
| | "units": len(state_snapshot.get("units", [])), |
| | "max_units": state_snapshot.get("max_units", 0), |
| | "building_integrity": state_snapshot.get("building_integrity", 1.0), |
| | } |
| | entry = { |
| | "tick": tick, |
| | "headline": cycle_summary.headline, |
| | "threat_level": cycle_summary.threat_level, |
| | "key_highlights": cycle_summary.key_highlights, |
| | "risks": cycle_summary.risks, |
| | "next_focus": cycle_summary.next_focus, |
| | "metrics": metrics, |
| | } |
| | self._cycle_summaries.append(entry) |
| | if len(self._cycle_summaries) > 30: |
| | self._cycle_summaries = self._cycle_summaries[-30:] |
| | threat_value_map = {"CRITICAL": 4, "HIGH": 3, "MODERATE": 2, "LOW": 1} |
| | value = threat_value_map.get(cycle_summary.threat_level.upper(), 0) if cycle_summary.threat_level else 0 |
| | self._threat_history.append({ |
| | "tick": tick, |
| | "threat_level": cycle_summary.threat_level, |
| | "value": value, |
| | }) |
| | if len(self._threat_history) > 120: |
| | self._threat_history = self._threat_history[-120:] |
| |
|
| | def _record_tick_metrics_locked(self, state: Optional[dict]): |
| | """Capture per-tick metrics for after-action chart visualization.""" |
| | if not state: |
| | return |
| | entry = { |
| | "tick": state.get("tick", 0), |
| | "fires": len(state.get("fires", [])), |
| | "units": len(state.get("units", [])), |
| | "max_units": state.get("max_units") or getattr(self.engine.world, "max_units", 0) if self.engine.world else 0, |
| | "building_integrity": state.get("building_integrity", 1.0), |
| | } |
| | self._metrics_history.append(entry) |
| | if len(self._metrics_history) > 600: |
| | self._metrics_history = self._metrics_history[-600:] |
| |
|
| | def _record_action_breakdown(self, tick: int, deploy: int, move: int, replace: int): |
| | """Track how many actions the AI recommended per tick.""" |
| | self._action_history.append({ |
| | "tick": tick, |
| | "deploy": deploy, |
| | "move": move, |
| | "replace": replace, |
| | }) |
| | if len(self._action_history) > 200: |
| | self._action_history = self._action_history[-200:] |
| |
|
| | def _record_player_action(self, action: str, description: str, metadata: Optional[dict] = None): |
| | """Track player-driven interventions (manual deploy/remove/fire).""" |
| | if metadata is None: |
| | metadata = {} |
| | tick = 0 |
| | if self.engine and self.engine.world: |
| | tick = getattr(self.engine.world, "tick", 0) |
| | else: |
| | tick = self._tick_count |
| | entry = { |
| | "tick": tick, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "action": action, |
| | "description": description, |
| | "details": dict(metadata), |
| | } |
| | self._player_actions.append(entry) |
| | if len(self._player_actions) > 200: |
| | self._player_actions = self._player_actions[-200:] |
| |
|
| | def _build_player_action_context(self) -> dict: |
| | """Summarize player-driven interventions for after-action reporting.""" |
| | actions = list(self._player_actions) |
| | counts = {"deploy_unit": 0, "remove_unit": 0, "add_fire": 0} |
| | action_meta = { |
| | "deploy_unit": ("π", "Deployed units"), |
| | "remove_unit": ("β»οΈ", "Removed units"), |
| | "add_fire": ("π₯", "Ignited fires"), |
| | } |
| | for entry in actions: |
| | action_type = entry.get("action") |
| | if action_type in counts: |
| | counts[action_type] += 1 |
| | total = sum(counts.values()) |
| | if total: |
| | parts = [f"Player executed {total} manual action(s)."] |
| | if counts["deploy_unit"]: |
| | parts.append(f"π Deploy: {counts['deploy_unit']} time(s)") |
| | if counts["remove_unit"]: |
| | parts.append(f"β»οΈ Remove: {counts['remove_unit']} time(s)") |
| | if counts["add_fire"]: |
| | parts.append(f"π₯ Ignite: {counts['add_fire']} time(s)") |
| | summary = " ".join(parts) |
| | else: |
| | summary = "Player has not manually deployed, removed, or ignited anything this run." |
| | recent_entries = list(reversed(actions[-6:])) |
| | recent = [ |
| | { |
| | "tick": entry.get("tick", 0), |
| | "description": entry.get("description", ""), |
| | "action": entry.get("action"), |
| | "timestamp": entry.get("timestamp"), |
| | } |
| | for entry in recent_entries |
| | ] |
| | markdown_lines = [summary] |
| | if recent: |
| | markdown_lines.append("") |
| | markdown_lines.append("**Recent player actions**") |
| | for entry in recent: |
| | icon = action_meta.get(entry["action"], ("π", ""))[0] |
| | markdown_lines.append(f"- Tick {entry['tick']} Β· {icon} {entry['description']}") |
| | markdown = "\n".join(markdown_lines).strip() |
| | return { |
| | "total": total, |
| | "counts": counts, |
| | "recent": recent, |
| | "summary": summary, |
| | "markdown": markdown, |
| | } |
| |
|
| | def _record_mcp_call(self, entry: dict[str, Any]) -> None: |
| | """Capture MCP tool invocations for UI display.""" |
| | with self._lock: |
| | cloned = dict(entry) |
| | cloned.setdefault("local_timestamp", datetime.now().strftime("%H:%M:%S")) |
| | self._mcp_call_log.append(cloned) |
| | if len(self._mcp_call_log) > 80: |
| | self._mcp_call_log = self._mcp_call_log[-80:] |
| | self._mcp_log_dirty = True |
| |
|
| | def _get_mcp_log_text_locked(self, limit: int = 20) -> str: |
| | """Format MCP tool logs for UI display.""" |
| | if not self._mcp_call_log: |
| | return "No MCP tool calls yet..." |
| | lines = [] |
| | for entry in self._mcp_call_log[-limit:]: |
| | ts = entry.get("local_timestamp", entry.get("timestamp", "")) |
| | tool = entry.get("tool", "unknown") |
| | args = entry.get("arguments", {}) |
| | args_preview = ", ".join(f"{k}={v}" for k, v in list(args.items())[:3]) |
| | result = entry.get("result", {}) |
| | status = result.get("status", "ok") if isinstance(result, dict) else "ok" |
| | duration = entry.get("duration_ms", 0) |
| | lines.append(f"[{ts}] {tool}({args_preview}) β {status} ({duration} ms)") |
| | return "\n".join(lines) |
| |
|
| | def _get_after_action_report_payload_locked(self) -> dict: |
| | """Serialize after-action report state for UI consumption.""" |
| | payload: dict = {"status": self._result_report_status} |
| | if self._result_report_status == "ready" and self._result_report: |
| | payload["report"] = self._result_report.to_dict() |
| | elif self._result_report_status == "error": |
| | payload["error"] = self._result_report_error or "Unknown error" |
| | return payload |
| |
|
| | def _call_mcp_tool(self, name: str, **kwargs: Any) -> dict[str, Any]: |
| | """Invoke a tool through the shared FastMCP client.""" |
| | session_id = self._session_owner_id or "default" |
| | if not self._mcp_client: |
| | self._mcp_client = LocalFastMCPClient(fastmcp_server, self._record_mcp_call) |
| | if self._attached_session_id and self._attached_session_id != session_id: |
| | detach_engine(self._attached_session_id) |
| | self._attached_session_id = "" |
| | if self._attached_session_id != session_id: |
| | attach_engine(self.engine, session_id) |
| | self._attached_session_id = session_id |
| | try: |
| | call_kwargs = dict(kwargs) |
| | call_kwargs["session_id"] = session_id |
| | return self._mcp_client.call_tool(name, **call_kwargs) |
| | except Exception as exc: |
| | self._add_log("error", f"MCP tool {name} failed: {exc}") |
| | return {"status": "error", "message": str(exc)} |
| |
|
| | def shutdown(self) -> None: |
| | """Stop background threads and detach from MCP server.""" |
| | thread = None |
| | with self._lock: |
| | if self._running: |
| | self._running = False |
| | thread = self._thread |
| | self._thread = None |
| | if thread and thread.is_alive(): |
| | thread.join(timeout=2.0) |
| | with self._lock: |
| | if self._attached_session_id: |
| | detach_engine(self._attached_session_id) |
| | self._attached_session_id = "" |
| | self._session_owner_id = "" |
| | self._on_update = None |
| |
|
| | def _get_mcp_world_state(self) -> dict: |
| | """Fetch world state via the MCP tools with local fallback.""" |
| | state = self._call_mcp_tool("get_world_state") |
| | if not isinstance(state, dict) or state.get("status") == "error": |
| | state = self.engine.get_state() |
| |
|
| | def _maybe(name: str) -> dict[str, Any]: |
| | data = self._call_mcp_tool(name) |
| | return data if isinstance(data, dict) else {} |
| |
|
| | state["mcp_idle_units"] = _maybe("find_idle_units") |
| | state["mcp_uncovered_fires"] = _maybe("find_uncovered_fires") |
| | state["mcp_building_threats"] = _maybe("find_building_threats") |
| | state["mcp_coverage"] = _maybe("analyze_coverage") |
| | return state |
| | |
| | def should_show_result(self) -> tuple[bool, bool]: |
| | """ |
| | Check if game result should be shown. |
| | |
| | Returns: |
| | tuple of (should_show, is_first_time) |
| | - should_show: True if popup should be displayed |
| | - is_first_time: True if this is the first time showing (needs render) |
| | """ |
| | with self._lock: |
| | |
| | if self._result_dismissed: |
| | return (False, False) |
| | |
| | state = self.engine.get_state() if self.engine.world else {"status": "idle"} |
| | status = state.get("status", "idle") |
| | |
| | if status in ["success", "fail"]: |
| | |
| | is_first_time = not self._result_shown |
| | if is_first_time: |
| | self._result_shown = True |
| | return (True, is_first_time) |
| | |
| | return (False, False) |
| | |
| | def dismiss_result(self): |
| | """Dismiss the game result popup (called when player clicks it).""" |
| | with self._lock: |
| | self._result_dismissed = True |
| | |
| | def get_result_status(self) -> str: |
| | """Get current result status.""" |
| | with self._lock: |
| | if self.engine.world is None: |
| | return "idle" |
| | state = self.engine.get_state() |
| | return state.get("status", "idle") |
| |
|
| | def _prepare_after_action_context_locked(self, outcome: str, state: dict) -> Optional[dict]: |
| | """Build context for after-action report generation.""" |
| | signature = f"{outcome}_{state.get('tick', 0)}_{len(self._logs)}" |
| | if signature == self._last_result_signature: |
| | return None |
| | context = self._build_after_action_context_locked(outcome, state) |
| | self._result_report_status = "pending" |
| | self._result_report = None |
| | self._result_report_error = "" |
| | self._last_result_signature = signature |
| | self._last_result_payload_signature = "" |
| | return context |
| |
|
| | def _build_after_action_context_locked(self, outcome: str, state: dict) -> dict: |
| | """Assemble transcripts and metrics for the after-action LLM call.""" |
| | outcome_label = "Victory" if outcome == "success" else "Defeat" |
| | fires_remaining = len(state.get("fires", [])) |
| | units_active = len(state.get("units", [])) |
| | building_integrity = state.get("building_integrity", 1.0) |
| | integrity_percent = f"{building_integrity:.0%}" if isinstance(building_integrity, (int, float)) else "N/A" |
| |
|
| | stage_messages = self._current_cycle_messages[:] if self._current_cycle_messages else self._advisor_history[-3:] |
| | transcripts = {"assessment_md": "", "planning_md": "", "execution_md": ""} |
| | for msg in stage_messages: |
| | content = msg.get("content", "") |
| | if not content: |
| | continue |
| | lowered = content.lower() |
| | if "stage 1" in lowered and not transcripts["assessment_md"]: |
| | transcripts["assessment_md"] = content |
| | elif "stage 2" in lowered and not transcripts["planning_md"]: |
| | transcripts["planning_md"] = content |
| | elif "stage 3" in lowered and not transcripts["execution_md"]: |
| | transcripts["execution_md"] = content |
| |
|
| | summary_text = ( |
| | f"Tick {state.get('tick', 0)} Β· Fires {fires_remaining} Β· " |
| | f"Units {units_active}/{state.get('max_units', 0)} Β· Building Integrity {integrity_percent}" |
| | ) |
| |
|
| | if self._metrics_history: |
| | chart_points = [dict(point) for point in self._metrics_history] |
| | else: |
| | chart_points = [] |
| | for entry in self._cycle_summaries: |
| | metrics = entry.get("metrics") or {} |
| | if not metrics: |
| | continue |
| | chart_points.append({ |
| | "tick": metrics.get("tick", entry.get("tick")), |
| | "fires": metrics.get("fires", 0), |
| | "units": metrics.get("units", 0), |
| | "max_units": metrics.get("max_units", state.get("max_units", 0)), |
| | "building_integrity": metrics.get("building_integrity", building_integrity), |
| | }) |
| |
|
| | context = { |
| | "outcome": outcome, |
| | "outcome_label": outcome_label, |
| | "tick": state.get("tick", 0), |
| | "fires_remaining": fires_remaining, |
| | "units_active": units_active, |
| | "building_integrity_percent": integrity_percent, |
| | "summary_text": summary_text, |
| | "state_snapshot": { |
| | "tick": state.get("tick", 0), |
| | "status": state.get("status", ""), |
| | "building_integrity": building_integrity, |
| | "max_units": state.get("max_units", 0), |
| | }, |
| | "cycle_summaries": list(self._cycle_summaries), |
| | "chart_points": chart_points, |
| | "threat_history": list(self._threat_history), |
| | "action_history": list(self._action_history), |
| | } |
| | player_actions_context = self._build_player_action_context() |
| | context.update(transcripts) |
| | context["player_actions_context"] = player_actions_context |
| | context["player_actions_md"] = player_actions_context.get("markdown", "") |
| | return context |
| |
|
| | def _launch_after_action_report(self, context: dict): |
| | """Run after-action report generation in the background.""" |
| |
|
| | def _runner(): |
| | try: |
| | report = self.advisor.generate_after_action_report(context) |
| | except Exception as exc: |
| | with self._lock: |
| | self._result_report = None |
| | self._result_report_status = "error" |
| | self._result_report_error = str(exc) |
| | self._last_result_payload_signature = "" |
| | return |
| |
|
| | with self._lock: |
| | if report and not report.error: |
| | self._result_report = report |
| | self._result_report_status = "ready" |
| | self._result_report_error = "" |
| | else: |
| | self._result_report = report |
| | self._result_report_status = "error" |
| | self._result_report_error = (report.error if report else "Unknown error") |
| | self._last_result_payload_signature = "" |
| |
|
| | threading.Thread(target=_runner, daemon=True).start() |
| | |
| | def get_recommendations(self) -> Optional[dict]: |
| | """Get latest advisor recommendations.""" |
| | with self._lock: |
| | if self._latest_recommendations: |
| | return self._latest_recommendations.to_dict() |
| | return None |
| | |
| | def get_logs(self, limit: int = 50) -> list[dict]: |
| | """Get recent log entries.""" |
| | with self._lock: |
| | return [log.to_dict() for log in self._logs[-limit:]] |
| | |
| | def get_logs_text(self, limit: int = 20) -> str: |
| | """Get all logs as formatted text for display.""" |
| | with self._lock: |
| | lines = [] |
| | for log in self._logs[-limit:]: |
| | if log.event_type == "advisor": |
| | lines.append(f"[Tick {log.tick}] π€ AI: {log.message}") |
| | if log.details and log.details.get("recommendations"): |
| | for rec in log.details["recommendations"]: |
| | target = rec.get("target", {}) |
| | lines.append( |
| | f" β {rec.get('suggested_unit_type')} at " |
| | f"({target.get('x')}, {target.get('y')}): {rec.get('reason', '')[:50]}" |
| | ) |
| | elif log.event_type == "deploy": |
| | lines.append(f"[Tick {log.tick}] π Deploy: {log.message}") |
| | elif log.event_type == "status": |
| | lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
| | elif log.event_type == "error": |
| | lines.append(f"[{log.timestamp}] β οΈ Error: {log.message}") |
| | else: |
| | lines.append(f"[Tick {log.tick}] {log.message}") |
| | return "\n".join(lines) |
| |
|
| | def get_mcp_log_text(self, limit: int = 20) -> str: |
| | """Expose MCP call history.""" |
| | with self._lock: |
| | return self._get_mcp_log_text_locked(limit) |
| | |
| | def get_advisor_text(self, limit: int = 5) -> str: |
| | """Get AI advisor display with rich reasoning (legacy text format).""" |
| | messages = self.get_advisor_messages(limit) |
| | if not messages: |
| | return "π€ AI Advisor standing by..." |
| | return messages[-1].get("content", "") if messages else "" |
| | |
| | def _wrap_analysis_block( |
| | self, |
| | content: str, |
| | default_summary: str = "AI Analysis", |
| | *, |
| | split_stage_sections: bool = False, |
| | open_by_default: bool = False, |
| | ) -> str: |
| | """Wrap advisor markdown in a collapsible details block.""" |
| | if not content: |
| | return content |
| | |
| | lines = content.splitlines() |
| | summary = default_summary |
| | first_value_line = "" |
| | auto_summary = default_summary == "AI Analysis" |
| | |
| | for line in lines: |
| | stripped = line.strip() |
| | if not stripped: |
| | continue |
| | if not first_value_line: |
| | first_value_line = stripped |
| | if auto_summary and stripped.lower().startswith("###"): |
| | summary = stripped.lstrip("# ").strip() |
| | break |
| | |
| | summary = html.escape(summary or default_summary) |
| | |
| | if auto_summary and first_value_line and first_value_line.lower().startswith("###"): |
| | body_lines = lines[1:] |
| | else: |
| | body_lines = lines |
| | body = "\n".join(body_lines).strip() |
| | |
| | if split_stage_sections: |
| | rendered = self._render_stage_sections(body, open_by_default) |
| | if rendered: |
| | body = rendered |
| | |
| | open_attr = " open" if open_by_default else "" |
| | return ( |
| | f"<details class=\"analysis-entry\"{open_attr}>" |
| | f"<summary>{summary}</summary>\n" |
| | f"{body}\n" |
| | "</details>" |
| | ) |
| |
|
| | def _render_stage_sections(self, body: str, force_open: bool = False) -> str | None: |
| | """Split a multi-stage markdown block into per-stage collapsible sections.""" |
| | lines = body.splitlines() |
| | sections: list[tuple[str, list[str]]] = [] |
| | current_title: str | None = None |
| | current_lines: list[str] = [] |
| | |
| | def _flush(): |
| | nonlocal current_title, current_lines |
| | if current_title is None: |
| | return |
| | sections.append((current_title, list(current_lines))) |
| | current_title = None |
| | current_lines = [] |
| | |
| | for line in lines: |
| | stripped = line.strip() |
| | if stripped.startswith("### "): |
| | _flush() |
| | current_title = stripped.lstrip("# ").strip() |
| | current_lines = [] |
| | continue |
| | if current_title is None: |
| | |
| | if not stripped: |
| | continue |
| | current_title = "Details" |
| | current_lines.append(line) |
| | |
| | _flush() |
| | |
| | if not sections: |
| | return None |
| | |
| | rendered_sections: list[str] = [] |
| | for title, section_lines in sections: |
| | section_body = "\n".join(section_lines).rstrip() |
| | if section_body.endswith("---"): |
| | section_body = section_body[:-3].rstrip() |
| | section_body = section_body.strip() or "_No details available._" |
| | open_attr = " open" if force_open else "" |
| | rendered_sections.append( |
| | f"<details class=\"analysis-stage\"{open_attr}>" |
| | f"<summary>{html.escape(title)}</summary>\n" |
| | f"{section_body}\n" |
| | "</details>" |
| | ) |
| | |
| | return "\n\n".join(rendered_sections) |
| | |
| | def _build_advisor_messages_locked(self) -> list[dict]: |
| | """Assemble advisor messages. Caller must hold _lock.""" |
| | messages: list[dict] = [] |
| | |
| | |
| | if ( |
| | not self._current_cycle_messages |
| | and not self._advisor_history |
| | and not self._is_thinking |
| | ): |
| | messages.append({ |
| | "role": "assistant", |
| | "content": "π Hello! I'm your AI Tactical Advisor.\n\nStart the simulation and I'll analyze the fire situation, describe my reasoning, and recommend tactical deployments.\n\nWatch me think, plan, and execute!" |
| | }) |
| | return messages |
| | |
| | current_entry = self._build_current_cycle_entry_locked() |
| | if current_entry: |
| | messages.append(current_entry) |
| | |
| | return messages |
| |
|
| | def _build_current_cycle_entry_locked(self) -> Optional[dict]: |
| | """Render the active cycle as a single β±οΈ Tick block (even while streaming).""" |
| | cycle_sections: list[str] = [] |
| | stage_placeholders = { |
| | 1: ("π Stage 1 Β· Assessment", "Querying MCP tools and analyzing the situation..."), |
| | 2: ("π― Stage 2 Β· Planning", "Formulating tactical strategy..."), |
| | 3: ("β‘ Stage 3 Β· Execution", "Generating deployment commands via MCP..."), |
| | 4: ("π§ Stage 4 Β· Summary", "Consolidating cycle insights..."), |
| | } |
| | current_stage_title: Optional[str] = None |
| | for msg in self._current_cycle_messages: |
| | content = msg.get("content", "") |
| | if content: |
| | cycle_sections.append(content) |
| | |
| | |
| | if self._is_thinking: |
| | tick = self._thinking_start_tick |
| | if tick is None and self.engine.world: |
| | tick = self.engine.get_state().get("tick", 0) |
| | title, desc = stage_placeholders.get( |
| | self._current_stage, |
| | ("π€ AI Thinking", "Processing...") |
| | ) |
| | current_stage_title = title |
| | cycle_sections.append( |
| | f"### {title} `[Tick {tick if tick is not None else '?'}]`\n\n{desc}" |
| | ) |
| | |
| | if not cycle_sections: |
| | return None |
| | |
| | tick_label = self._thinking_start_tick |
| | if tick_label is None and self.engine.world: |
| | tick_label = self.engine.get_state().get("tick", 0) |
| | summary = f"β±οΈ Tick {tick_label if tick_label is not None else '?'}" |
| | if self._is_thinking and current_stage_title: |
| | summary = f"{summary} Β· {current_stage_title} β³" |
| | body = "\n\n".join(cycle_sections).strip() |
| | open_by_default = tick_label == 0 |
| | |
| | return { |
| | "role": "assistant", |
| | "content": self._wrap_analysis_block( |
| | body, |
| | summary, |
| | split_stage_sections=True, |
| | open_by_default=open_by_default, |
| | ), |
| | "metadata": {"title": summary, "status": "pending" if self._is_thinking else "done"}, |
| | } |
| | |
| | def get_advisor_messages(self) -> list: |
| | """Get current AI advisor cycle messages (progressive stage display).""" |
| | with self._lock: |
| | return self._build_advisor_messages_locked() |
| | |
| | def _build_history_messages_locked(self) -> list[dict]: |
| | """Aggregate advisor history into per-tick chatbot messages.""" |
| | if not self._advisor_history: |
| | return [] |
| | |
| | history_messages: list[dict] = [] |
| | buffer: list[str] = [] |
| | tick_num = "?" |
| | |
| | def flush_cycle(): |
| | nonlocal buffer, tick_num |
| | cycle_text = "\n\n".join(buffer).strip() |
| | if cycle_text: |
| | history_messages.append({ |
| | "role": "assistant", |
| | "content": self._wrap_analysis_block( |
| | cycle_text, |
| | f"β±οΈ Tick {tick_num}", |
| | split_stage_sections=True, |
| | open_by_default=(str(tick_num).strip() == "0"), |
| | ), |
| | "metadata": {"title": f"β±οΈ Tick {tick_num}", "status": "done"}, |
| | }) |
| | buffer = [] |
| | tick_num = "?" |
| | |
| | for msg in self._advisor_history: |
| | content = msg.get("content", "") |
| | if not content: |
| | continue |
| | buffer.append(content) |
| | if tick_num == "?" and "[Tick " in content: |
| | start = content.find("[Tick ") + 6 |
| | end = content.find("]", start) |
| | if end > start: |
| | tick_num = content[start:end] |
| | |
| | lowered = content.lower() |
| | if "stage 4" in lowered: |
| | flush_cycle() |
| | |
| | if buffer: |
| | flush_cycle() |
| | |
| | return history_messages if history_messages else [{ |
| | "role": "assistant", |
| | "content": self._wrap_analysis_block("No previous analysis cycles yet...", "π History"), |
| | "metadata": {"title": "π History", "status": "done"}, |
| | }] |
| | |
| | def get_advisor_markdown(self) -> str: |
| | """Get the latest AI advisor cycle as formatted plain text.""" |
| | with self._lock: |
| | return self._get_advisor_markdown_internal() |
| | |
| | def get_advisor_history_chat_messages(self) -> list[dict]: |
| | """Get advisor history formatted for chatbot display.""" |
| | with self._lock: |
| | return self._build_history_messages_locked() |
| | |
| | |
| | |
| | |
| | |
| | def get_game_changes(self) -> dict: |
| | """ |
| | Check game-critical components for changes (called by game_timer every 1s). |
| | |
| | Returns a dict with: |
| | - state: current simulation state |
| | - grid_changed: bool - whether fire/unit positions changed |
| | - status_changed: bool - whether status bar should update |
| | """ |
| | import hashlib |
| | |
| | with self._lock: |
| | if self.engine.world: |
| | state = self._compose_state_locked() |
| | else: |
| | state = { |
| | "status": "idle", |
| | "after_action_report": self._get_after_action_report_payload_locked(), |
| | } |
| | result = {"state": state} |
| | |
| | |
| | grid_data = { |
| | "fires": sorted([(f["x"], f["y"], round(f["intensity"], 2)) for f in state.get("fires", [])]), |
| | "units": sorted([(u["x"], u["y"], u["type"]) for u in state.get("units", [])]), |
| | "buildings": sorted([(b["x"], b["y"]) for b in state.get("buildings", [])]) |
| | } |
| | current_grid_hash = hashlib.md5(json.dumps(grid_data, sort_keys=True).encode()).hexdigest() |
| | result["grid_changed"] = current_grid_hash != self._last_grid_hash |
| | if result["grid_changed"]: |
| | self._last_grid_hash = current_grid_hash |
| | |
| | |
| | status = state.get("status", "idle") |
| | is_running = self._running |
| | result["status_changed"] = is_running and status == "running" |
| | |
| | return result |
| | |
| | def get_ui_changes(self) -> dict: |
| | """ |
| | Check UI panel components for changes (called by ui_timer every 2s). |
| | |
| | Returns a dict with: |
| | - state: current simulation state |
| | - advisor_changed: bool - whether advisor messages changed |
| | - advisor_messages: list[dict] | None - new content if changed |
| | - history_changed: bool - whether history HTML changed |
| | - advisor_history: str or None - new content if changed |
| | - event_log_changed: bool - whether event log changed |
| | - event_log: str or None - new content if changed |
| | - buttons_changed: bool - whether button states changed |
| | - button_states: tuple (start_enabled, pause_enabled) |
| | - result_changed: bool - whether result popup changed |
| | - result_state: str - current result state |
| | """ |
| | with self._lock: |
| | if self.engine.world: |
| | state = self._compose_state_locked() |
| | else: |
| | state = { |
| | "status": "idle", |
| | "after_action_report": self._get_after_action_report_payload_locked(), |
| | } |
| | result = {"state": state} |
| | |
| | |
| | advisor_messages = self._get_advisor_chat_messages_internal() |
| | signature = tuple(msg.get("content", "") for msg in advisor_messages) |
| | content_changed = signature != self._last_advisor_signature |
| | result["advisor_changed"] = self._is_thinking or content_changed |
| | result["advisor_messages"] = advisor_messages if result["advisor_changed"] else None |
| | if content_changed: |
| | self._last_advisor_signature = signature |
| | |
| | |
| | history_messages = self._build_history_messages_locked() |
| | history_signature = tuple(msg.get("content", "") for msg in history_messages) |
| | result["history_changed"] = history_signature != self._last_history_signature |
| | result["advisor_history"] = history_messages if result["history_changed"] else None |
| | if result["history_changed"]: |
| | self._last_history_signature = history_signature |
| | |
| | |
| | event_log = self._get_event_log_internal() |
| | result["event_log_changed"] = event_log != self._last_event_log |
| | result["event_log"] = event_log if result["event_log_changed"] else None |
| | if result["event_log_changed"]: |
| | self._last_event_log = event_log |
| |
|
| | |
| | if self._mcp_log_dirty: |
| | mcp_log = self._get_mcp_log_text_locked() |
| | result["mcp_log_changed"] = True |
| | result["mcp_log"] = mcp_log |
| | self._last_mcp_log = mcp_log |
| | self._mcp_log_dirty = False |
| | else: |
| | result["mcp_log_changed"] = False |
| | result["mcp_log"] = None |
| | |
| | |
| | status = state.get("status", "idle") |
| | is_running = self._running |
| | is_paused = ( |
| | self.engine.world is not None |
| | and not self._running |
| | and self.engine.world.status == SimulationStatus.RUNNING |
| | ) |
| | start_enabled = is_paused or (not is_running and status in ["idle", "success", "fail"]) |
| | pause_enabled = is_running and status == "running" |
| | current_buttons = (start_enabled, pause_enabled) |
| | result["buttons_changed"] = current_buttons != self._last_button_states |
| | result["button_states"] = current_buttons |
| | if result["buttons_changed"]: |
| | self._last_button_states = current_buttons |
| | |
| | |
| | if self._result_dismissed: |
| | current_result = "" |
| | elif status in ["success", "fail"]: |
| | current_result = status |
| | else: |
| | current_result = "" |
| | result["result_changed"] = current_result != self._last_result_state |
| | result["result_state"] = current_result |
| | if result["result_changed"]: |
| | self._last_result_state = current_result |
| |
|
| | report_payload = self._get_after_action_report_payload_locked() |
| | overlay_payload = { |
| | "outcome": current_result, |
| | "after_action": report_payload, |
| | } |
| | payload_signature = json.dumps(overlay_payload, sort_keys=True) |
| | if payload_signature != self._last_result_payload_signature: |
| | result["result_changed"] = True |
| | self._last_result_payload_signature = payload_signature |
| | result["result_payload"] = overlay_payload |
| | |
| | return result |
| | |
| | def get_changed_components(self) -> dict: |
| | """ |
| | Legacy function - combines game and UI changes. |
| | Used by button click handlers for full refresh. |
| | """ |
| | game = self.get_game_changes() |
| | ui = self.get_ui_changes() |
| | |
| | |
| | return { |
| | "state": game["state"], |
| | "grid_changed": game["grid_changed"], |
| | "status_changed": game["status_changed"], |
| | "advisor_changed": ui["advisor_changed"], |
| | "advisor_messages": ui["advisor_messages"], |
| | "history_changed": ui["history_changed"], |
| | "advisor_history": ui["advisor_history"], |
| | "event_log_changed": ui["event_log_changed"], |
| | "event_log": ui["event_log"], |
| | "mcp_log_changed": ui["mcp_log_changed"], |
| | "mcp_log": ui["mcp_log"], |
| | "buttons_changed": ui["buttons_changed"], |
| | "button_states": ui["button_states"], |
| | "result_changed": ui["result_changed"], |
| | "result_state": ui["result_state"], |
| | "result_payload": ui.get("result_payload"), |
| | } |
| | |
| | def _get_advisor_markdown_internal(self) -> str: |
| | """Internal helper to flatten advisor messages into text.""" |
| | messages = self._build_advisor_messages_locked() |
| | parts = [msg.get("content", "") for msg in messages if msg.get("content")] |
| | return "\n\n---\n\n".join(parts) if parts else "Waiting for analysis..." |
| | |
| | def _get_advisor_chat_messages_internal(self) -> list[dict]: |
| | """Internal helper to normalize advisor messages for Chatbot display.""" |
| | messages = self._build_advisor_messages_locked() |
| | chat_messages: list[dict] = [] |
| | for msg in messages: |
| | role = msg.get("role", "assistant") |
| | if role not in ("user", "assistant", "system"): |
| | role = "assistant" |
| | chat_msg = { |
| | "role": role, |
| | "content": msg.get("content", ""), |
| | } |
| | metadata = msg.get("metadata") |
| | if metadata: |
| | chat_msg["metadata"] = metadata |
| | options = msg.get("options") |
| | if options: |
| | chat_msg["options"] = options |
| | chat_messages.append(chat_msg) |
| | return chat_messages |
| | |
| | def get_advisor_chat_messages(self) -> list[dict]: |
| | """Public helper for UI components that expect message dictionaries.""" |
| | with self._lock: |
| | return self._get_advisor_chat_messages_internal() |
| | |
| | def _get_event_log_internal(self, limit: int = 15) -> str: |
| | """Internal method to get event log (must be called with lock held).""" |
| | lines = [] |
| | event_logs = [log for log in self._logs if log.event_type != "advisor"] |
| | for log in event_logs[-limit:]: |
| | if log.event_type == "deploy": |
| | lines.append(f"[Tick {log.tick}] π {log.message}") |
| | elif log.event_type == "status": |
| | lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
| | elif log.event_type == "error": |
| | lines.append(f"[{log.timestamp}] β οΈ {log.message}") |
| | else: |
| | lines.append(f"[Tick {log.tick}] {log.message}") |
| | return "\n".join(lines) if lines else "No events yet..." |
| | |
| | def get_event_log_text(self, limit: int = 15) -> str: |
| | """Get event logs (deploy, status, error) without AI advisor.""" |
| | with self._lock: |
| | lines = [] |
| | event_logs = [log for log in self._logs if log.event_type != "advisor"] |
| | for log in event_logs[-limit:]: |
| | if log.event_type == "deploy": |
| | lines.append(f"[Tick {log.tick}] π {log.message}") |
| | elif log.event_type == "status": |
| | lines.append(f"[{log.timestamp}] βΉοΈ {log.message}") |
| | elif log.event_type == "error": |
| | lines.append(f"[{log.timestamp}] β οΈ {log.message}") |
| | else: |
| | lines.append(f"[Tick {log.tick}] {log.message}") |
| | return "\n".join(lines) if lines else "No events yet..." |
| | |
| | def get_deploy_log_text(self, limit: int = 10) -> str: |
| | """Get deploy-related logs only (deploy success and errors).""" |
| | with self._lock: |
| | lines = [] |
| | deploy_logs = [log for log in self._logs if log.event_type in ["deploy", "error"]] |
| | for log in deploy_logs[-limit:]: |
| | if log.event_type == "deploy": |
| | lines.append(f"[Tick {log.tick}] β
{log.message}") |
| | elif log.event_type == "error": |
| | lines.append(f"[Tick {log.tick}] β {log.message}") |
| | return "\n".join(lines) if lines else "Click on a cell to deploy units..." |
| | |
| | def deploy_unit(self, unit_type: str, x: int, y: int, source: str = "player") -> dict: |
| | """Deploy a unit (thread-safe).""" |
| | with self._lock: |
| | result = self._call_mcp_tool("deploy_unit", unit_type=unit_type, x=x, y=y, source=source) |
| | |
| | if result.get("status") == "ok": |
| | self._add_log( |
| | "deploy", |
| | f"Deployed {unit_type} at ({x}, {y})", |
| | {"unit": result.get("unit"), "source": source} |
| | ) |
| | if str(source or "").startswith("player"): |
| | unit_label = "fire truck" if unit_type == "fire_truck" else "helicopter" |
| | self._record_player_action( |
| | "deploy_unit", |
| | f"Deployed {unit_label} at ({x}, {y})", |
| | {"unit_type": unit_type, "x": x, "y": y} |
| | ) |
| | else: |
| | self._add_log( |
| | "error", |
| | f"Failed to deploy {unit_type}: {result.get('message')}" |
| | ) |
| | |
| | return result |
| | |
| | def remove_unit(self, x: int, y: int) -> dict: |
| | """Remove a unit at position (thread-safe).""" |
| | with self._lock: |
| | result = self._call_mcp_tool("remove_unit", x=x, y=y) |
| | |
| | if result.get("status") == "ok": |
| | self._add_log( |
| | "deploy", |
| | f"Removed unit at ({x}, {y})", |
| | {"unit": result.get("unit")} |
| | ) |
| | removed_unit = result.get("unit") or { |
| | "type": result.get("removed_unit_type"), |
| | **(result.get("position") or {"x": x, "y": y}), |
| | } |
| | unit_type = removed_unit.get("type") or result.get("removed_unit_type", "") |
| | unit_label = ( |
| | "fire truck" |
| | if unit_type == "fire_truck" |
| | else "helicopter" |
| | if unit_type == "helicopter" |
| | else "unit" |
| | ) |
| | self._record_player_action( |
| | "remove_unit", |
| | f"Removed {unit_label} at ({removed_unit.get('x', x)}, {removed_unit.get('y', y)})", |
| | {"unit": removed_unit} |
| | ) |
| | |
| | return result |
| | |
| | def add_fire(self, x: int, y: int, intensity: float = 0.5) -> dict: |
| | """Add fire at position (thread-safe). For testing purposes.""" |
| | with self._lock: |
| | if self.engine.world is None: |
| | return {"status": "error", "message": "World not initialized"} |
| | |
| | |
| | if not (0 <= x < self.engine.world.width and 0 <= y < self.engine.world.height): |
| | return {"status": "error", "message": f"Position ({x}, {y}) out of bounds"} |
| | |
| | cell = self.engine.world.grid[y][x] |
| | |
| | |
| | if cell.cell_type not in (CellType.FOREST, CellType.BUILDING): |
| | return {"status": "error", "message": "Fire can only be placed on forest or building"} |
| | |
| | |
| | if cell.fire_intensity > 0: |
| | return {"status": "error", "message": "Cannot place fire on existing fire or smoke"} |
| | |
| | |
| | for unit in self.engine.world.units: |
| | if unit.x == x and unit.y == y: |
| | return {"status": "error", "message": "Cannot place fire where a unit exists"} |
| | |
| | |
| | old_intensity = cell.fire_intensity |
| | cell.fire_intensity = min(1.0, max(0.0, intensity)) |
| | |
| | |
| | self.engine.world.calculate_metrics() |
| | |
| | self._add_log( |
| | "fire", |
| | f"π₯ Added fire at ({x}, {y}) with intensity {int(intensity * 100)}%", |
| | {"x": x, "y": y, "intensity": cell.fire_intensity, "old_intensity": old_intensity} |
| | ) |
| | result = {"status": "ok", "x": x, "y": y, "intensity": cell.fire_intensity} |
| | self._record_player_action( |
| | "add_fire", |
| | f"Ignited fire at ({x}, {y}) Β· intensity {int(cell.fire_intensity * 100)}%", |
| | result |
| | ) |
| | return result |
| | |
| | def has_unit_at(self, x: int, y: int) -> bool: |
| | """Check if there's a unit at position.""" |
| | with self._lock: |
| | if self.engine.world is None: |
| | return False |
| | for unit in self.engine.world.units: |
| | if unit.x == x and unit.y == y: |
| | return True |
| | return False |
| | |
| | def is_running(self) -> bool: |
| | """Check if simulation is running.""" |
| | return self._running |
| | |
| | def set_auto_execute(self, enabled: bool): |
| | """Set whether to automatically execute AI recommendations.""" |
| | with self._lock: |
| | self._auto_execute = enabled |
| | |
| | def is_auto_execute(self) -> bool: |
| | """Check if auto-execute is enabled.""" |
| | return self._auto_execute |
| | |
| | def get_advisor_model_choice(self) -> str: |
| | """Return the currently selected advisor model label.""" |
| | with self._lock: |
| | return self._model_choice |
| | |
| | def set_advisor_model_choice(self, choice: str) -> dict: |
| | """ |
| | Switch the advisor backend/model based on UI selection. |
| | Returns status dict usable by the UI for feedback. |
| | """ |
| | preset = ADVISOR_MODEL_CHOICES.get(choice) |
| | if not preset: |
| | return {"status": "error", "message": "Unknown model selection."} |
| | |
| | if preset["provider"] == "openai" and not os.getenv("OPENAI_API_KEY"): |
| | return { |
| | "status": "error", |
| | "message": "Please set OPENAI_API_KEY before selecting an OpenAI model.", |
| | } |
| | |
| | with self._lock: |
| | self.advisor = AdvisorAgent( |
| | provider=preset["provider"], |
| | model=preset["model"], |
| | ) |
| | self._model_choice = choice |
| | self._add_log("status", f"Advisor model switched to {choice}") |
| | |
| | return {"status": "ok", "selection": choice} |
| | |
| | def reset_advisor_model_choice(self) -> str: |
| | """Reset advisor selection back to the default preset.""" |
| | default_choice = DEFAULT_ADVISOR_MODEL_CHOICE |
| | preset = ADVISOR_MODEL_CHOICES[default_choice] |
| | with self._lock: |
| | self.advisor = AdvisorAgent( |
| | provider=preset["provider"], |
| | model=preset["model"], |
| | ) |
| | self._model_choice = default_choice |
| | self._advisor_first_run = True |
| | self._add_log("status", f"Advisor model reset to {default_choice}") |
| | return default_choice |
| | |
| | def is_thinking(self) -> bool: |
| | """Check if AI advisor is currently thinking.""" |
| | return self._is_thinking |
| | |
| | def get_thinking_stage(self) -> int: |
| | """Get current AI thinking stage (0=idle, 1=tool_call, 2=assess, 3=plan, 4=execute).""" |
| | return self._current_stage if self._is_thinking else 0 |
| | |
| | def _simulation_loop(self): |
| | """Background simulation loop.""" |
| | |
| | after_action_context = None |
| |
|
| | while self._running: |
| | sim_should_stop = False |
| | try: |
| | with self._lock: |
| | if self.engine.world is None: |
| | break |
| | |
| | |
| | if self._advisor_first_run: |
| | self._advisor_first_run = False |
| | self._run_advisor(self._get_mcp_world_state()) |
| | |
| | |
| | self.engine.step() |
| | self._tick_count += 1 |
| | |
| | |
| | state = self.engine.get_state() |
| | self._record_tick_metrics_locked(state) |
| | status = state.get("status", "running") |
| | |
| | if status in ["success", "fail"]: |
| | self._add_log( |
| | "status", |
| | "π SUCCESS! Fire contained!" if status == "success" else "π₯ FAILED! Too much damage!" |
| | ) |
| | after_action_context = self._prepare_after_action_context_locked(status, state) |
| | self._running = False |
| | sim_should_stop = True |
| | else: |
| | |
| | if self._tick_count % self.advisor_interval == 0: |
| | self._run_advisor(self._get_mcp_world_state()) |
| | |
| | |
| | if self._on_update: |
| | try: |
| | self._on_update() |
| | except Exception: |
| | pass |
| | |
| | if after_action_context: |
| | self._launch_after_action_report(after_action_context) |
| | after_action_context = None |
| |
|
| | if sim_should_stop: |
| | break |
| |
|
| | |
| | time.sleep(self.tick_interval) |
| | |
| | except Exception as e: |
| | with self._lock: |
| | self._add_log("error", f"Simulation error: {str(e)}") |
| | break |
| | |
| | def _archive_current_cycle_locked(self): |
| | """Move the completed cycle messages into history (caller must hold _lock).""" |
| | if not self._current_cycle_messages: |
| | return |
| | self._advisor_history.extend(self._current_cycle_messages) |
| | if len(self._advisor_history) > 42: |
| | self._advisor_history = self._advisor_history[-39:] |
| | self._current_cycle_messages = [] |
| | |
| | def _run_advisor(self, state: dict): |
| | """ |
| | Run advisor analysis with progressive stage display. |
| | Each stage is shown one at a time: Assessment β Planning β Execution β Summary. |
| | """ |
| | |
| | if self._advisor_running: |
| | return |
| | |
| | self._advisor_running = True |
| | tick = state.get("tick", 0) |
| | |
| | try: |
| | |
| | |
| | |
| | |
| | self._archive_current_cycle_locked() |
| | |
| | |
| | self._current_cycle_messages = [] |
| | self._thinking_start_tick = tick |
| | |
| | |
| | |
| | |
| | self._current_stage = 1 |
| | self._is_thinking = True |
| | |
| | self._lock.release() |
| | try: |
| | assessment = self.advisor.assess(state) |
| | finally: |
| | self._lock.acquire() |
| | |
| | |
| | self._add_assessment_message(assessment, state, tick) |
| | self._is_thinking = False |
| | |
| | |
| | |
| | |
| | self._current_stage = 2 |
| | self._is_thinking = True |
| | |
| | self._lock.release() |
| | try: |
| | plan = self.advisor.plan(state, assessment) |
| | finally: |
| | self._lock.acquire() |
| | |
| | self._add_planning_message(plan, tick) |
| | self._is_thinking = False |
| | |
| | |
| | |
| | |
| | self._current_stage = 3 |
| | self._is_thinking = True |
| | |
| | self._lock.release() |
| | try: |
| | recommendations = self.advisor.execute(state, assessment, plan) |
| | finally: |
| | self._lock.acquire() |
| | |
| | |
| | self._add_execution_message(recommendations, tick) |
| | |
| | |
| | response = self._build_advisor_response(assessment, plan, recommendations) |
| | self._latest_recommendations = response |
| | |
| | |
| | |
| | |
| | self._current_stage = 4 |
| | self._is_thinking = True |
| | |
| | self._lock.release() |
| | try: |
| | cycle_summary = self.advisor.summarize(state, assessment, plan, recommendations, response) |
| | finally: |
| | self._lock.acquire() |
| | |
| | self._is_thinking = False |
| | self._add_summary_cycle_message(cycle_summary, tick) |
| | self._record_cycle_summary(tick, cycle_summary, self.engine.get_state()) |
| | |
| | |
| | |
| | |
| | self._current_stage = 5 |
| | |
| | self._add_log( |
| | "advisor", |
| | response.summary, |
| | { |
| | "recommendations": [r.to_dict() for r in response.recommendations], |
| | "thinking": response.thinking, |
| | "analysis": response.analysis, |
| | "priority": response.priority, |
| | "error": response.error |
| | } |
| | ) |
| | |
| | |
| | if self._auto_execute and response.recommendations: |
| | self._execute_recommendations(response, tick) |
| | |
| | except Exception as e: |
| | self._is_thinking = False |
| | self._current_stage = 0 |
| | self._add_log("error", f"Advisor error: {str(e)}") |
| | |
| | self._current_cycle_messages.append({ |
| | "role": "assistant", |
| | "content": f"β AI Advisor Error: {str(e)}" |
| | }) |
| | finally: |
| | self._advisor_running = False |
| | |
| | self._archive_current_cycle_locked() |
| | |
| | def _add_assessment_message(self, assessment: AssessmentResult, state: dict, tick: int): |
| | """Add the Assessment message (Stage 1) with MCP tool calls to current cycle.""" |
| | |
| | fires = state.get("fires", []) |
| | units = state.get("units", []) |
| | buildings = state.get("buildings", []) |
| | building_integrity = state.get("building_integrity", 1.0) |
| | status = state.get("status", "running") |
| | width = state.get("width", 10) |
| | height = state.get("height", 10) |
| | |
| | |
| | emoji_map = generate_emoji_map(self.engine) |
| | |
| | priority_emoji = { |
| | "CRITICAL": "π΄", |
| | "HIGH": "π ", |
| | "MODERATE": "π‘", |
| | "LOW": "π’" |
| | } |
| | emoji = priority_emoji.get(assessment.threat_level, "βͺ") |
| | |
| | content = f""" |
| | ### π Stage 1 Β· Assessment `[Tick {tick}]` |
| | |
| | #### π§ MCP Tool Calls |
| | |
| | <details> |
| | <summary>π€ <code>mcp.get_world_state()</code></summary> |
| | |
| | ```python |
| | result = mcp.get_world_state() |
| | ``` |
| | |
| | **Response** |
| | ``` |
| | status: {status} | grid: {width}x{height} |
| | fires: {len(fires)} | units: {len(units)}/{state.get('max_units', 10)} |
| | buildings: {len(buildings)} | integrity: {building_integrity:.0%} |
| | ``` |
| | |
| | ``` |
| | {emoji_map} |
| | ``` |
| | |
| | _Legend: π² Forest Β· π’ Building Β· π₯ Fire Β· π¨ Smoke Β· π Truck Β· π Heli_ |
| | </details> |
| | """.strip() |
| | |
| | if assessment.ineffective_units: |
| | idle_lines = [] |
| | for u in assessment.ineffective_units[:5]: |
| | idle_lines.append(f"- {u.get('type', 'unit')} at ({u.get('x', 0)}, {u.get('y', 0)})") |
| | if len(assessment.ineffective_units) > 5: |
| | idle_lines.append(f"- ... and {len(assessment.ineffective_units) - 5} more") |
| | idle_block = "\n".join(idle_lines) |
| | content += f""" |
| | |
| | <details> |
| | <summary>π€ <code>mcp.find_idle_units()</code> β β οΈ {len(assessment.ineffective_units)} idle</summary> |
| | |
| | ```python |
| | {idle_block} |
| | ``` |
| | </details> |
| | """ |
| | else: |
| | content += """ |
| | |
| | <details> |
| | <summary>π€ <code>mcp.find_idle_units()</code> β β
All effective</summary> |
| | |
| | ```python |
| | # Every deployed unit is actively covering a fire |
| | ``` |
| | </details> |
| | """ |
| | |
| | if assessment.uncovered_fires: |
| | fire_lines = [] |
| | for f in assessment.uncovered_fires[:5]: |
| | fire_lines.append(f"- Fire at ({f.get('x', 0)}, {f.get('y', 0)}) Β· intensity={f.get('intensity', 0):.0%}") |
| | if len(assessment.uncovered_fires) > 5: |
| | fire_lines.append(f"- ... and {len(assessment.uncovered_fires) - 5} more") |
| | fire_block = "\n".join(fire_lines) |
| | content += f""" |
| | |
| | <details> |
| | <summary>π€ <code>mcp.find_uncovered_fires()</code> β π¨ {len(assessment.uncovered_fires)} uncovered</summary> |
| | |
| | ```python |
| | {fire_block} |
| | ``` |
| | </details> |
| | """ |
| | else: |
| | content += """ |
| | |
| | <details> |
| | <summary>π€ <code>mcp.find_uncovered_fires()</code> β β
All covered</summary> |
| | |
| | ```python |
| | # All active fires currently have unit coverage |
| | ``` |
| | </details> |
| | """ |
| | |
| | content += f""" |
| | #### π Analysis Results |
| | |
| | **Threat Level:** {emoji} {assessment.threat_level} |
| | |
| | **Fire Analysis** |
| | - Total fires: {assessment.fire_count} |
| | - High intensity (>70%): {len(assessment.high_intensity_fires)} |
| | - Building threats: {len(assessment.building_threats)} |
| | - β οΈ Uncovered fires: {len(assessment.uncovered_fires)} |
| | |
| | **Unit Analysis** |
| | - Deployed: {assessment.unit_count}/{assessment.max_units} |
| | - Effective: {len(assessment.effective_units)} |
| | - Idle: {len(assessment.ineffective_units)} |
| | - Coverage ratio: {assessment.coverage_ratio:.0%} |
| | |
| | **Summary:** {assessment.summary} |
| | """ |
| | content += "\n\n---\n" |
| | self._current_cycle_messages.append({"role": "assistant", "content": content}) |
| | |
| | def _add_planning_message(self, plan, tick: int): |
| | """Add the Planning message (Stage 2) to current cycle.""" |
| | strategy_emoji = { |
| | "deploy_new": "π", |
| | "optimize_existing": "π", |
| | "balanced": "βοΈ", |
| | "monitor": "π" |
| | } |
| | s_emoji = strategy_emoji.get(plan.strategy, "π") |
| | |
| | action_lines = [] |
| | if plan.deploy_count > 0: |
| | action_lines.append(f"- Deploy: {plan.deploy_count} new unit(s)") |
| | if plan.reposition_units: |
| | action_lines.append(f"- Reposition: {len(plan.reposition_units)} idle unit(s)") |
| | if plan.priority_targets: |
| | action_lines.append(f"- Priority fires: {len(plan.priority_targets)}") |
| | |
| | content = f""" |
| | ### π― Stage 2 Β· Planning `[Tick {tick}]` |
| | |
| | **Strategy:** {s_emoji} `{plan.strategy.upper()}` |
| | |
| | **Reasoning** |
| | {plan.reasoning} |
| | """.strip() |
| | |
| | if action_lines: |
| | content += "\n\n**Action Outline**\n" + "\n".join(action_lines) |
| |
|
| | content += "\n\n---\n" |
| | |
| | self._current_cycle_messages.append({"role": "assistant", "content": content}) |
| | |
| | def _add_execution_message(self, recommendations, tick: int): |
| | """Add the Execution message (Stage 3) with MCP tool calls to current cycle.""" |
| | if not recommendations: |
| | content = f"### β‘ Stage 3 Β· Execution `[Tick {tick}]`\n\nβ
**No actions required.** Current deployments already cover every fire." |
| | content += "\n\n---\n" |
| | self._current_cycle_messages.append({"role": "assistant", "content": content}) |
| | self._record_action_breakdown(tick, 0, 0, 0) |
| | return |
| | |
| | move_count = sum(1 for r in recommendations if getattr(r, "action", "deploy") == "move") |
| | replace_count = sum(1 for r in recommendations if getattr(r, "action", "deploy") == "replace") |
| | deploy_count = len(recommendations) - move_count - replace_count |
| | |
| | summary = [] |
| | if move_count: |
| | summary.append(f"- π Reposition {move_count} idle unit(s)") |
| | if replace_count: |
| | summary.append(f"- π Replace {replace_count} unit(s)") |
| | if deploy_count: |
| | summary.append(f"- π Deploy {deploy_count} additional unit(s)") |
| | |
| | content = f"### β‘ Stage 3 Β· Execution `[Tick {tick}]`\n\n" + "\n".join(summary) + "\n\n#### π§ MCP Tool Actions\n" |
| | |
| | for idx, rec in enumerate(recommendations, 1): |
| | unit_emoji = "π" if rec.suggested_unit_type == "fire_truck" else "π" |
| | unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
| | action = getattr(rec, "action", "deploy") |
| | |
| | if action == "move": |
| | source_x = getattr(rec, "source_x", 0) |
| | source_y = getattr(rec, "source_y", 0) |
| | block = f""" |
| | <details> |
| | <summary>{idx}. π Move {unit_name} from ({source_x}, {source_y}) β ({rec.target_x}, {rec.target_y})</summary> |
| | |
| | ```python |
| | mcp.move_unit( |
| | source_x={source_x}, source_y={source_y}, |
| | target_x={rec.target_x}, target_y={rec.target_y} |
| | ) |
| | ``` |
| | |
| | π‘ _{rec.reason}_ |
| | </details> |
| | """ |
| | elif action == "replace": |
| | old_type = getattr(rec, "old_unit_type", "fire_truck") |
| | old_name = "Fire Truck" if old_type == "fire_truck" else "Helicopter" |
| | old_emoji = "π" if old_type == "fire_truck" else "π" |
| | block = f""" |
| | <details> |
| | <summary>{idx}. π Replace {old_name} {old_emoji} with {unit_name} {unit_emoji} at ({rec.target_x}, {rec.target_y})</summary> |
| | |
| | ```python |
| | mcp.replace_unit( |
| | x={rec.target_x}, y={rec.target_y}, |
| | new_unit_type="{rec.suggested_unit_type}" |
| | ) |
| | ``` |
| | |
| | π‘ _{rec.reason}_ |
| | </details> |
| | """ |
| | else: |
| | block = f""" |
| | <details> |
| | <summary>{idx}. {unit_emoji} Deploy {unit_name} to ({rec.target_x}, {rec.target_y})</summary> |
| | |
| | ```python |
| | mcp.deploy_unit( |
| | unit_type="{rec.suggested_unit_type}", |
| | x={rec.target_x}, y={rec.target_y} |
| | ) |
| | ``` |
| | |
| | π‘ _{rec.reason}_ |
| | </details> |
| | """ |
| | content += "\n" + block.strip() + "\n" |
| |
|
| | content += "\n\n---\n" |
| | |
| | self._current_cycle_messages.append({"role": "assistant", "content": content}) |
| | self._record_action_breakdown(tick, deploy_count, move_count, replace_count) |
| | |
| | def _add_summary_cycle_message(self, cycle_summary: CycleSummary, tick: int): |
| | """Add Stage 4 summary message to the current cycle.""" |
| | highlights = "\n".join(f"- {item}" for item in cycle_summary.key_highlights) or "- (none)" |
| | risks = "\n".join(f"- {item}" for item in cycle_summary.risks) or "- (none)" |
| | next_focus = "\n".join(f"- {item}" for item in cycle_summary.next_focus) or "- (none)" |
| | content = f""" |
| | ### π§ Stage 4 Β· Summary `[Tick {tick}]` |
| | |
| | **Headline:** {cycle_summary.headline} |
| | |
| | **Threat Level:** {cycle_summary.threat_level} |
| | |
| | **Key Highlights** |
| | {highlights} |
| | |
| | **Risks / Gaps** |
| | {risks} |
| | |
| | **Next Focus** |
| | {next_focus} |
| | """.strip() |
| | content += "\n\n---\n" |
| | self._current_cycle_messages.append({"role": "assistant", "content": content}) |
| | |
| | def _build_advisor_response(self, assessment: AssessmentResult, plan: PlanResult, recommendations: list) -> AdvisorResponse: |
| | """Build the final AdvisorResponse object.""" |
| | |
| | |
| | thinking_parts = [ |
| | f"π Scanning {assessment.fire_count} active fires...", |
| | ] |
| | if assessment.uncovered_fires: |
| | thinking_parts.append(f"π¨ ALERT: {len(assessment.uncovered_fires)} fire(s) with NO coverage!") |
| | if assessment.building_threats: |
| | thinking_parts.append(f"π’ {len(assessment.building_threats)} fire(s) threatening buildings!") |
| | if assessment.ineffective_units: |
| | thinking_parts.append(f"π {len(assessment.ineffective_units)} idle unit(s) should be repositioned") |
| | thinking_parts.append(f"π― Strategy: {plan.strategy.upper()} - {plan.reasoning}") |
| | |
| | |
| | priority_emoji = {"CRITICAL": "π΄", "HIGH": "π ", "MODERATE": "π‘", "LOW": "π’"} |
| | emoji = priority_emoji.get(assessment.threat_level, "βͺ") |
| | |
| | if assessment.threat_level == "CRITICAL": |
| | summary = f"{emoji} CRITICAL: {assessment.summary}. Immediate action required!" |
| | elif assessment.threat_level == "HIGH": |
| | summary = f"{emoji} HIGH: {assessment.summary}. Rapid response needed." |
| | elif assessment.threat_level == "MODERATE": |
| | summary = f"{emoji} MODERATE: {assessment.summary}. Tactical deployment advised." |
| | else: |
| | summary = f"{emoji} LOW: {assessment.summary}. Monitoring situation." |
| | |
| | return AdvisorResponse( |
| | summary=summary, |
| | recommendations=recommendations, |
| | thinking="\n".join(thinking_parts), |
| | analysis=f"{assessment.fire_count} fires | {assessment.unit_count}/{assessment.max_units} units | {assessment.building_integrity:.0%} building integrity", |
| | priority=assessment.threat_level, |
| | assessment=assessment, |
| | plan=plan |
| | ) |
| | |
| | def _execute_recommendations(self, response: AdvisorResponse, tick: int): |
| | """Execute AI recommendations (must be called with lock held).""" |
| | executed_count = 0 |
| | for rec in response.recommendations: |
| | action = getattr(rec, "action", "deploy") |
| | rec_key = f"{tick}_{action}_{rec.suggested_unit_type}_{rec.target_x}_{rec.target_y}" |
| |
|
| | if rec_key in self._executed_recommendations: |
| | continue |
| |
|
| | if action == "move": |
| | source_x = getattr(rec, "source_x", -1) |
| | source_y = getattr(rec, "source_y", -1) |
| | if source_x < 0 or source_y < 0: |
| | self._add_log("error", f"π€ AI move failed: missing source ({source_x},{source_y})") |
| | continue |
| | result = self._call_mcp_tool( |
| | "move_unit", |
| | source_x=source_x, |
| | source_y=source_y, |
| | target_x=rec.target_x, |
| | target_y=rec.target_y, |
| | ) |
| | if result.get("status") == "ok": |
| | executed_count += 1 |
| | unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
| | self._add_log( |
| | "deploy", |
| | f"π€ AI moved {unit_name}: ({source_x},{source_y}) β ({rec.target_x},{rec.target_y})", |
| | {"source": "ai", "reason": rec.reason, "action": "move"}, |
| | ) |
| | self._executed_recommendations.add(rec_key) |
| | else: |
| | self._add_log( |
| | "error", |
| | f"π€ AI move failed: {result.get('message', 'unknown error')} " |
| | f"(source=({source_x},{source_y}) target=({rec.target_x},{rec.target_y}))", |
| | ) |
| | elif action == "remove": |
| | result = self._call_mcp_tool("remove_unit", x=rec.target_x, y=rec.target_y) |
| | if result.get("status") == "ok": |
| | executed_count += 1 |
| | unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
| | self._add_log( |
| | "deploy", |
| | f"π€ AI removed {unit_name} at ({rec.target_x},{rec.target_y}) - ready to redeploy", |
| | {"source": "ai", "reason": rec.reason, "action": "remove"}, |
| | ) |
| | self._executed_recommendations.add(rec_key) |
| | else: |
| | self._add_log( |
| | "error", |
| | f"π€ AI remove failed: {result.get('message', 'unknown error')} at " |
| | f"({rec.target_x},{rec.target_y})", |
| | ) |
| | else: |
| | result = self._call_mcp_tool( |
| | "deploy_unit", |
| | unit_type=rec.suggested_unit_type, |
| | x=rec.target_x, |
| | y=rec.target_y, |
| | source="ai", |
| | ) |
| | if result.get("status") == "ok": |
| | executed_count += 1 |
| | unit_name = "Fire Truck" if rec.suggested_unit_type == "fire_truck" else "Helicopter" |
| | self._add_log( |
| | "deploy", |
| | f"π€ AI deployed {unit_name} at ({rec.target_x}, {rec.target_y})", |
| | {"source": "ai", "reason": rec.reason, "action": "deploy"}, |
| | ) |
| | self._executed_recommendations.add(rec_key) |
| | else: |
| | self._add_log( |
| | "error", |
| | f"π€ AI deploy failed: {result.get('message', 'unknown error')} " |
| | f"at ({rec.target_x}, {rec.target_y})", |
| | ) |
| |
|
| | if len(self._executed_recommendations) > 100: |
| | self._executed_recommendations = set(list(self._executed_recommendations)[-50:]) |
| | |
| | def _add_log(self, event_type: str, message: str, details: Optional[dict] = None): |
| | """Add a log entry (must be called with lock held).""" |
| | tick = self.engine.world.tick if self.engine.world else 0 |
| | |
| | self._logs.append(LogEntry( |
| | timestamp=datetime.now().strftime("%H:%M:%S"), |
| | tick=tick, |
| | event_type=event_type, |
| | message=message, |
| | details=details |
| | )) |
| | |
| | |
| | if len(self._logs) > 200: |
| | self._logs = self._logs[-100:] |
| |
|
| |
|
| | |
| | _service: Optional[SimulationService] = None |
| |
|
| |
|
| | def get_service() -> SimulationService: |
| | """Get or create the global simulation service.""" |
| | global _service |
| | if _service is None: |
| | _service = SimulationService() |
| | return _service |
| |
|
| |
|