| | """ |
| | Fire-Rescue - Simulation Core |
| | |
| | Handles fire spread, unit behavior, and win/lose conditions. |
| | """ |
| |
|
| | import random |
| | from typing import Optional |
| |
|
| | from config import range_text |
| | from models import ( |
| | WorldState, |
| | Cell, |
| | CellType, |
| | Unit, |
| | UnitType, |
| | SimulationStatus, |
| | Event, |
| | ) |
| |
|
| | FIRE_COUNT_RANGE_TEXT = range_text("fire_count") |
| | BUILDING_COUNT_RANGE_TEXT = range_text("building_count") |
| | MAX_UNITS_RANGE_TEXT = range_text("max_units") |
| |
|
| |
|
| | class SimulationConfig: |
| | """Configuration parameters for the simulation.""" |
| | |
| | |
| | GRID_WIDTH = 10 |
| | GRID_HEIGHT = 10 |
| | |
| | |
| | FIRE_SPREAD_CHANCE = 0.08 |
| | FIRE_GROWTH_RATE = 0.02 |
| | FIRE_MAX_INTENSITY = 1.0 |
| | FIRE_DECAY_RATE = 0.01 |
| | |
| | |
| | DAMAGE_PER_TICK = 0.01 |
| | |
| | |
| | FIRE_TRUCK_RANGE = 1 |
| | FIRE_TRUCK_POWER = 0.4 |
| | HELICOPTER_RANGE = 2 |
| | HELICOPTER_POWER = 0.25 |
| | UNIT_COOLDOWN = 1 |
| | |
| | |
| | BUILDING_DAMAGE_THRESHOLD = 0.5 |
| | FOREST_DAMAGE_THRESHOLD = 0.8 |
| | FIRE_SAFE_THRESHOLD = 0.1 |
| |
|
| |
|
| | class SimulationEngine: |
| | """ |
| | Core simulation engine that manages world state updates. |
| | """ |
| | |
| | def __init__(self, config: Optional[SimulationConfig] = None): |
| | self.config = config or SimulationConfig() |
| | self.world: Optional[WorldState] = None |
| | |
| | def reset( |
| | self, |
| | seed: Optional[int] = None, |
| | fire_count: int = 4, |
| | fire_intensity: float = 0.6, |
| | building_count: int = 16, |
| | max_units: int = 10 |
| | ) -> WorldState: |
| | f""" |
| | Reset and initialize a new simulation. |
| | |
| | Args: |
| | seed: Random seed for reproducibility |
| | fire_count: Number of initial fire points ({FIRE_COUNT_RANGE_TEXT}) |
| | fire_intensity: Initial fire intensity (0.0-1.0) |
| | building_count: Number of buildings to place ({BUILDING_COUNT_RANGE_TEXT}) |
| | max_units: Maximum number of deployable units ({MAX_UNITS_RANGE_TEXT}) |
| | """ |
| | self.world = WorldState( |
| | width=self.config.GRID_WIDTH, |
| | height=self.config.GRID_HEIGHT, |
| | tick=0, |
| | status=SimulationStatus.RUNNING, |
| | max_ticks=200, |
| | max_units=max_units |
| | ) |
| | |
| | self.world.initialize_grid( |
| | seed=seed, |
| | fire_count=fire_count, |
| | fire_intensity=fire_intensity, |
| | building_count=building_count |
| | ) |
| | self.world.calculate_metrics() |
| | |
| | return self.world |
| | |
| | def step(self) -> WorldState: |
| | """Advance simulation by one tick.""" |
| | if self.world is None: |
| | raise RuntimeError("Simulation not initialized. Call reset() first.") |
| | |
| | if self.world.status != SimulationStatus.RUNNING: |
| | return self.world |
| | |
| | |
| | self._update_units() |
| | |
| | |
| | self._update_fire() |
| | |
| | |
| | self._update_damage() |
| | |
| | |
| | self.world.calculate_metrics() |
| | |
| | |
| | self.world.tick += 1 |
| | |
| | |
| | self._check_end_conditions() |
| | |
| | return self.world |
| | |
| | def _update_fire(self): |
| | """Update fire spread and growth.""" |
| | new_fires: list[tuple[int, int, float]] = [] |
| | |
| | for row in self.world.grid: |
| | for cell in row: |
| | if cell.fire_intensity > 0: |
| | |
| | cell.fire_intensity = min( |
| | cell.fire_intensity + self.config.FIRE_GROWTH_RATE, |
| | self.config.FIRE_MAX_INTENSITY |
| | ) |
| | |
| | |
| | for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]: |
| | nx, ny = cell.x + dx, cell.y + dy |
| | neighbor = self.world.get_cell(nx, ny) |
| | |
| | if neighbor and neighbor.fire_intensity == 0 and not neighbor.is_destroyed(): |
| | |
| | spread_chance = self.config.FIRE_SPREAD_CHANCE * cell.fire_intensity |
| | if random.random() < spread_chance: |
| | |
| | new_intensity = cell.fire_intensity * 0.5 |
| | new_fires.append((nx, ny, new_intensity)) |
| | |
| | |
| | for x, y, intensity in new_fires: |
| | cell = self.world.get_cell(x, y) |
| | if cell and cell.fire_intensity == 0: |
| | cell.fire_intensity = intensity |
| | |
| | def _update_damage(self): |
| | """Update damage caused by fire.""" |
| | for row in self.world.grid: |
| | for cell in row: |
| | if cell.fire_intensity > 0 and cell.cell_type != CellType.EMPTY: |
| | |
| | damage = self.config.DAMAGE_PER_TICK * cell.fire_intensity |
| | cell.damage = min(cell.damage + damage, 1.0) |
| | |
| | def _update_units(self): |
| | """Update unit actions (firefighting).""" |
| | for unit in self.world.units: |
| | |
| | if unit.cooldown > 0: |
| | unit.cooldown -= 1 |
| | continue |
| | |
| | |
| | extinguished = False |
| | |
| | if unit.unit_type == UnitType.FIRE_TRUCK: |
| | extinguished = self._fire_truck_action(unit) |
| | elif unit.unit_type == UnitType.HELICOPTER: |
| | extinguished = self._helicopter_action(unit) |
| | |
| | if extinguished: |
| | unit.cooldown = self.config.UNIT_COOLDOWN |
| | |
| | def _fire_truck_action(self, unit: Unit) -> bool: |
| | """Fire truck extinguishes fires within a square radius (Chebyshev distance).""" |
| | targets = [] |
| | |
| | |
| | for dx in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): |
| | for dy in range(-self.config.FIRE_TRUCK_RANGE, self.config.FIRE_TRUCK_RANGE + 1): |
| | cell = self.world.get_cell(unit.x + dx, unit.y + dy) |
| | if cell and cell.fire_intensity > 0: |
| | targets.append(cell) |
| | |
| | if not targets: |
| | return False |
| | |
| | |
| | targets.sort(key=lambda c: c.fire_intensity, reverse=True) |
| | target = targets[0] |
| | |
| | |
| | target.fire_intensity = max(0, target.fire_intensity - self.config.FIRE_TRUCK_POWER) |
| | |
| | return True |
| | |
| | def _helicopter_action(self, unit: Unit) -> bool: |
| | """Helicopter extinguishes fires within a wider square radius.""" |
| | affected = False |
| | |
| | for dx in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): |
| | for dy in range(-self.config.HELICOPTER_RANGE, self.config.HELICOPTER_RANGE + 1): |
| | cell = self.world.get_cell(unit.x + dx, unit.y + dy) |
| | if cell and cell.fire_intensity > 0: |
| | cell.fire_intensity = max(0, cell.fire_intensity - self.config.HELICOPTER_POWER) |
| | affected = True |
| | |
| | return affected |
| | |
| | def _check_end_conditions(self): |
| | """Check win/lose conditions.""" |
| | |
| | if self.world.tick >= self.world.max_ticks: |
| | self.world.status = SimulationStatus.FAIL |
| | self.world.recent_events.append(Event( |
| | tick=self.world.tick, |
| | event_type="simulation_end", |
| | details={"reason": "time_limit_exceeded"} |
| | )) |
| | return |
| | |
| | |
| | if self.world.building_integrity < (1 - self.config.BUILDING_DAMAGE_THRESHOLD): |
| | self.world.status = SimulationStatus.FAIL |
| | self.world.recent_events.append(Event( |
| | tick=self.world.tick, |
| | event_type="simulation_end", |
| | details={"reason": "building_destroyed"} |
| | )) |
| | return |
| | |
| | |
| | if self.world.forest_burn_ratio > self.config.FOREST_DAMAGE_THRESHOLD: |
| | self.world.status = SimulationStatus.FAIL |
| | self.world.recent_events.append(Event( |
| | tick=self.world.tick, |
| | event_type="simulation_end", |
| | details={"reason": "forest_destroyed"} |
| | )) |
| | return |
| | |
| | |
| | fires = self.world.get_fires() |
| | if not fires or all(f.intensity < self.config.FIRE_SAFE_THRESHOLD for f in fires): |
| | self.world.status = SimulationStatus.SUCCESS |
| | self.world.recent_events.append(Event( |
| | tick=self.world.tick, |
| | event_type="simulation_end", |
| | details={"reason": "fire_contained"} |
| | )) |
| | |
| | def deploy_unit( |
| | self, |
| | unit_type: str, |
| | x: int, |
| | y: int, |
| | source: str = "player" |
| | ) -> dict: |
| | """Deploy a new unit at the specified position.""" |
| | if self.world is None: |
| | return {"status": "error", "message": "Simulation not initialized"} |
| | |
| | if self.world.status != SimulationStatus.RUNNING: |
| | return {"status": "error", "message": "Simulation is not running"} |
| | |
| | |
| | try: |
| | utype = UnitType(unit_type) |
| | except ValueError: |
| | return {"status": "error", "message": f"Invalid unit type: {unit_type}"} |
| | |
| | |
| | if not (0 <= x < self.world.width and 0 <= y < self.world.height): |
| | return {"status": "error", "message": f"Position ({x}, {y}) is out of bounds"} |
| | |
| | cell = self.world.get_cell(x, y) |
| | if cell and cell.fire_intensity > 0: |
| | return {"status": "error", "message": f"Cannot deploy on burning cell at ({x}, {y})"} |
| | |
| | if cell and cell.cell_type == CellType.BUILDING: |
| | return {"status": "error", "message": f"Cannot deploy on building at ({x}, {y})"} |
| | |
| | |
| | if len(self.world.units) >= self.world.max_units: |
| | return {"status": "error", "message": f"Unit limit reached ({self.world.max_units})"} |
| | |
| | |
| | unit = self.world.add_unit(utype, x, y, source) |
| | |
| | if unit is None: |
| | return {"status": "error", "message": "Failed to deploy unit"} |
| | |
| | return { |
| | "status": "ok", |
| | "unit": unit.to_dict() |
| | } |
| | |
| | def remove_unit_at(self, x: int, y: int) -> dict: |
| | """Remove a unit at the specified position.""" |
| | if self.world is None: |
| | return {"status": "error", "message": "Simulation not initialized"} |
| | |
| | |
| | unit_to_remove = None |
| | for unit in self.world.units: |
| | if unit.x == x and unit.y == y: |
| | unit_to_remove = unit |
| | break |
| | |
| | if unit_to_remove is None: |
| | return {"status": "error", "message": f"No unit at ({x}, {y})"} |
| | |
| | |
| | self.world.units.remove(unit_to_remove) |
| | |
| | return { |
| | "status": "ok", |
| | "message": f"Removed {unit_to_remove.unit_type.value} at ({x}, {y})", |
| | "unit": unit_to_remove.to_dict() |
| | } |
| | |
| | def get_state(self) -> dict: |
| | """Get current world state as dictionary.""" |
| | if self.world is None: |
| | return {"status": "error", "message": "Simulation not initialized"} |
| | |
| | return self.world.to_dict() |
| |
|
| |
|