| """ |
| SUMO Traffic Simulation Interface |
| ================================== |
| Handles integration with SUMO for 2D traffic simulation. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import subprocess |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
| import xml.etree.ElementTree as ET |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from config import ( |
| SUMO_CONFIG, |
| SUMO_NETWORKS_DIR, |
| SUMO_OUTPUT_DIR, |
| CASE_STUDY_LOCATION |
| ) |
|
|
| |
| SUMO_HOME = os.environ.get("SUMO_HOME", "") |
| if SUMO_HOME: |
| sys.path.append(os.path.join(SUMO_HOME, "tools")) |
|
|
| try: |
| import traci |
| TRACI_AVAILABLE = True |
| except ImportError: |
| TRACI_AVAILABLE = False |
| print("Warning: traci not available. SUMO simulation will be limited.") |
|
|
| try: |
| import sumolib |
| SUMOLIB_AVAILABLE = True |
| except ImportError: |
| SUMOLIB_AVAILABLE = False |
| print("Warning: sumolib not available.") |
|
|
|
|
| class SUMOSimulator: |
| """ |
| SUMO Traffic Simulator wrapper for accident reconstruction. |
| """ |
| |
| def __init__(self, network_path: str = None): |
| """ |
| Initialize the SUMO simulator. |
| |
| Args: |
| network_path: Path to SUMO network file (.net.xml) |
| """ |
| self.network_path = network_path |
| self.simulation_running = False |
| self.vehicles = {} |
| self.collision_detected = False |
| self.collision_data = None |
| |
| |
| SUMO_NETWORKS_DIR.mkdir(parents=True, exist_ok=True) |
| SUMO_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| |
| def create_network_from_osm( |
| self, |
| osm_file: str, |
| output_prefix: str = "network" |
| ) -> str: |
| """ |
| Convert OSM data to SUMO network format. |
| |
| Args: |
| osm_file: Path to OSM file |
| output_prefix: Prefix for output files |
| |
| Returns: |
| Path to generated network file |
| """ |
| output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| |
| |
| netconvert_cmd = [ |
| "netconvert", |
| "--osm-files", osm_file, |
| "--output-file", str(output_net), |
| "--geometry.remove", "true", |
| "--junctions.join", "true", |
| "--tls.guess", "true", |
| "--roundabouts.guess", "true" |
| ] |
| |
| try: |
| subprocess.run(netconvert_cmd, check=True, capture_output=True) |
| print(f"Network created: {output_net}") |
| self.network_path = str(output_net) |
| return str(output_net) |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: |
| print(f"netconvert failed: {e}") |
| |
| return self.create_simple_network(output_prefix) |
| |
| def create_simple_network( |
| self, |
| output_prefix: str = "simple_network", |
| location: Dict = None |
| ) -> str: |
| """ |
| Create a simple SUMO network for a roundabout. |
| |
| Args: |
| output_prefix: Prefix for output files |
| location: Location dictionary with coordinates |
| |
| Returns: |
| Path to generated network file |
| """ |
| if location is None: |
| location = CASE_STUDY_LOCATION |
| |
| |
| nodes_xml = self._create_nodes_xml(location) |
| nodes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.nod.xml" |
| with open(nodes_path, 'w') as f: |
| f.write(nodes_xml) |
| |
| |
| edges_xml = self._create_edges_xml() |
| edges_path = SUMO_NETWORKS_DIR / f"{output_prefix}.edg.xml" |
| with open(edges_path, 'w') as f: |
| f.write(edges_xml) |
| |
| |
| connections_xml = self._create_connections_xml() |
| connections_path = SUMO_NETWORKS_DIR / f"{output_prefix}.con.xml" |
| with open(connections_path, 'w') as f: |
| f.write(connections_xml) |
| |
| |
| output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| |
| try: |
| netconvert_cmd = [ |
| "netconvert", |
| "--node-files", str(nodes_path), |
| "--edge-files", str(edges_path), |
| "--connection-files", str(connections_path), |
| "--output-file", str(output_net) |
| ] |
| subprocess.run(netconvert_cmd, check=True, capture_output=True) |
| self.network_path = str(output_net) |
| return str(output_net) |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| |
| return self._create_minimal_network_xml(output_prefix) |
| |
| def _create_nodes_xml(self, location: Dict) -> str: |
| """Create SUMO nodes XML for a roundabout.""" |
| lat = location.get("latitude", 26.2285) |
| lng = location.get("longitude", 50.5818) |
| |
| |
| |
| scale = 111000 |
| |
| nodes = f'''<?xml version="1.0" encoding="UTF-8"?> |
| <nodes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/nodes_file.xsd"> |
| <!-- Roundabout center --> |
| <node id="center" x="0" y="0" type="priority"/> |
| |
| <!-- Roundabout nodes --> |
| <node id="r_n" x="0" y="30" type="priority"/> |
| <node id="r_e" x="30" y="0" type="priority"/> |
| <node id="r_s" x="0" y="-30" type="priority"/> |
| <node id="r_w" x="-30" y="0" type="priority"/> |
| |
| <!-- Approach nodes --> |
| <node id="a_n" x="0" y="150" type="priority"/> |
| <node id="a_e" x="150" y="0" type="priority"/> |
| <node id="a_s" x="0" y="-150" type="priority"/> |
| <node id="a_w" x="-150" y="0" type="priority"/> |
| </nodes>''' |
| return nodes |
| |
| def _create_edges_xml(self) -> str: |
| """Create SUMO edges XML for a roundabout.""" |
| edges = '''<?xml version="1.0" encoding="UTF-8"?> |
| <edges xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/edges_file.xsd"> |
| <!-- Roundabout edges (clockwise) --> |
| <edge id="r_n_e" from="r_n" to="r_e" numLanes="2" speed="8.33"/> |
| <edge id="r_e_s" from="r_e" to="r_s" numLanes="2" speed="8.33"/> |
| <edge id="r_s_w" from="r_s" to="r_w" numLanes="2" speed="8.33"/> |
| <edge id="r_w_n" from="r_w" to="r_n" numLanes="2" speed="8.33"/> |
| |
| <!-- Approach roads (incoming) --> |
| <edge id="in_n" from="a_n" to="r_n" numLanes="2" speed="13.89"/> |
| <edge id="in_e" from="a_e" to="r_e" numLanes="2" speed="13.89"/> |
| <edge id="in_s" from="a_s" to="r_s" numLanes="2" speed="13.89"/> |
| <edge id="in_w" from="a_w" to="r_w" numLanes="2" speed="13.89"/> |
| |
| <!-- Exit roads (outgoing) --> |
| <edge id="out_n" from="r_n" to="a_n" numLanes="2" speed="13.89"/> |
| <edge id="out_e" from="r_e" to="a_e" numLanes="2" speed="13.89"/> |
| <edge id="out_s" from="r_s" to="a_s" numLanes="2" speed="13.89"/> |
| <edge id="out_w" from="r_w" to="a_w" numLanes="2" speed="13.89"/> |
| </edges>''' |
| return edges |
| |
| def _create_connections_xml(self) -> str: |
| """Create SUMO connections XML.""" |
| connections = '''<?xml version="1.0" encoding="UTF-8"?> |
| <connections xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/connections_file.xsd"> |
| <!-- Entry to roundabout connections --> |
| <connection from="in_n" to="r_n_e"/> |
| <connection from="in_e" to="r_e_s"/> |
| <connection from="in_s" to="r_s_w"/> |
| <connection from="in_w" to="r_w_n"/> |
| |
| <!-- Roundabout circulation --> |
| <connection from="r_n_e" to="r_e_s"/> |
| <connection from="r_e_s" to="r_s_w"/> |
| <connection from="r_s_w" to="r_w_n"/> |
| <connection from="r_w_n" to="r_n_e"/> |
| |
| <!-- Exit from roundabout --> |
| <connection from="r_n_e" to="out_e"/> |
| <connection from="r_e_s" to="out_s"/> |
| <connection from="r_s_w" to="out_w"/> |
| <connection from="r_w_n" to="out_n"/> |
| </connections>''' |
| return connections |
| |
| def _create_minimal_network_xml(self, output_prefix: str) -> str: |
| """Create a minimal network XML directly (fallback).""" |
| network_xml = '''<?xml version="1.0" encoding="UTF-8"?> |
| <net version="1.9" junctionCornerDetail="5" limitTurnSpeed="5.50" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/net_file.xsd"> |
| |
| <location netOffset="0.00,0.00" convBoundary="-150,-150,150,150" origBoundary="-150,-150,150,150" projParameter="!"/> |
| |
| <edge id="in_n" from="a_n" to="r_n" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="in_e" from="a_e" to="r_e" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="in_s" from="a_s" to="r_s" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="in_w" from="a_w" to="r_w" priority="1" numLanes="2" speed="13.89" length="120"/> |
| |
| <edge id="out_n" from="r_n" to="a_n" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="out_e" from="r_e" to="a_e" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="out_s" from="r_s" to="a_s" priority="1" numLanes="2" speed="13.89" length="120"/> |
| <edge id="out_w" from="r_w" to="a_w" priority="1" numLanes="2" speed="13.89" length="120"/> |
| |
| <edge id="r_n_e" from="r_n" to="r_e" priority="2" numLanes="2" speed="8.33" length="47"/> |
| <edge id="r_e_s" from="r_e" to="r_s" priority="2" numLanes="2" speed="8.33" length="47"/> |
| <edge id="r_s_w" from="r_s" to="r_w" priority="2" numLanes="2" speed="8.33" length="47"/> |
| <edge id="r_w_n" from="r_w" to="r_n" priority="2" numLanes="2" speed="8.33" length="47"/> |
| |
| <junction id="a_n" type="dead_end" x="0.00" y="150.00"/> |
| <junction id="a_e" type="dead_end" x="150.00" y="0.00"/> |
| <junction id="a_s" type="dead_end" x="0.00" y="-150.00"/> |
| <junction id="a_w" type="dead_end" x="-150.00" y="0.00"/> |
| |
| <junction id="r_n" type="priority" x="0.00" y="30.00"/> |
| <junction id="r_e" type="priority" x="30.00" y="0.00"/> |
| <junction id="r_s" type="priority" x="0.00" y="-30.00"/> |
| <junction id="r_w" type="priority" x="-30.00" y="0.00"/> |
| |
| </net>''' |
| |
| output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| with open(output_net, 'w') as f: |
| f.write(network_xml) |
| |
| self.network_path = str(output_net) |
| print(f"Created minimal network: {output_net}") |
| return str(output_net) |
| |
| def create_route_file( |
| self, |
| vehicle_1_route: List[str], |
| vehicle_2_route: List[str], |
| vehicle_1_speed: float = 50, |
| vehicle_2_speed: float = 50, |
| output_prefix: str = "routes" |
| ) -> str: |
| """ |
| Create a SUMO route file for two vehicles. |
| |
| Args: |
| vehicle_1_route: List of edge IDs for vehicle 1 |
| vehicle_2_route: List of edge IDs for vehicle 2 |
| vehicle_1_speed: Speed of vehicle 1 in km/h |
| vehicle_2_speed: Speed of vehicle 2 in km/h |
| output_prefix: Prefix for output file |
| |
| Returns: |
| Path to route file |
| """ |
| |
| v1_speed_ms = vehicle_1_speed / 3.6 |
| v2_speed_ms = vehicle_2_speed / 3.6 |
| |
| routes_xml = f'''<?xml version="1.0" encoding="UTF-8"?> |
| <routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/routes_file.xsd"> |
| |
| <!-- Vehicle types --> |
| <vType id="car1" accel="2.6" decel="4.5" sigma="0.5" length="4.5" maxSpeed="{v1_speed_ms}" color="1,0,0"/> |
| <vType id="car2" accel="2.6" decel="4.5" sigma="0.5" length="4.5" maxSpeed="{v2_speed_ms}" color="0,0,1"/> |
| |
| <!-- Routes --> |
| <route id="route1" edges="{' '.join(vehicle_1_route)}"/> |
| <route id="route2" edges="{' '.join(vehicle_2_route)}"/> |
| |
| <!-- Vehicles --> |
| <vehicle id="vehicle_1" type="car1" route="route1" depart="0" departSpeed="{v1_speed_ms}"/> |
| <vehicle id="vehicle_2" type="car2" route="route2" depart="0" departSpeed="{v2_speed_ms}"/> |
| |
| </routes>''' |
| |
| routes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.rou.xml" |
| with open(routes_path, 'w') as f: |
| f.write(routes_xml) |
| |
| print(f"Routes file created: {routes_path}") |
| return str(routes_path) |
| |
| def create_config_file( |
| self, |
| network_file: str, |
| route_file: str, |
| output_prefix: str = "simulation" |
| ) -> str: |
| """ |
| Create a SUMO configuration file. |
| |
| Args: |
| network_file: Path to network file |
| route_file: Path to route file |
| output_prefix: Prefix for output files |
| |
| Returns: |
| Path to configuration file |
| """ |
| config_xml = f'''<?xml version="1.0" encoding="UTF-8"?> |
| <configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/sumoConfiguration.xsd"> |
| |
| <input> |
| <net-file value="{network_file}"/> |
| <route-files value="{route_file}"/> |
| </input> |
| |
| <time> |
| <begin value="0"/> |
| <end value="{SUMO_CONFIG['simulation_duration']}"/> |
| <step-length value="{SUMO_CONFIG['step_length']}"/> |
| </time> |
| |
| <output> |
| <tripinfo-output value="{SUMO_OUTPUT_DIR}/{output_prefix}_tripinfo.xml"/> |
| <collision-output value="{SUMO_OUTPUT_DIR}/{output_prefix}_collisions.xml"/> |
| </output> |
| |
| <processing> |
| <collision.action value="{SUMO_CONFIG['collision_action']}"/> |
| <collision.check-junctions value="true"/> |
| </processing> |
| |
| <random> |
| <seed value="{SUMO_CONFIG['random_seed']}"/> |
| </random> |
| |
| </configuration>''' |
| |
| config_path = SUMO_NETWORKS_DIR / f"{output_prefix}.sumocfg" |
| with open(config_path, 'w') as f: |
| f.write(config_xml) |
| |
| print(f"Configuration file created: {config_path}") |
| return str(config_path) |
| |
| def run_simulation( |
| self, |
| config_file: str, |
| gui: bool = False |
| ) -> Dict: |
| """ |
| Run a SUMO simulation. |
| |
| Args: |
| config_file: Path to SUMO configuration file |
| gui: Whether to run with GUI |
| |
| Returns: |
| Dictionary containing simulation results |
| """ |
| if not TRACI_AVAILABLE: |
| print("TraCI not available. Running simulation without real-time control.") |
| return self._run_simulation_batch(config_file) |
| |
| results = { |
| "steps": 0, |
| "collision_detected": False, |
| "collision_time": None, |
| "collision_position": None, |
| "vehicle_1_trajectory": [], |
| "vehicle_2_trajectory": [], |
| "vehicle_1_speeds": [], |
| "vehicle_2_speeds": [] |
| } |
| |
| try: |
| |
| sumo_binary = "sumo-gui" if gui else "sumo" |
| traci.start([sumo_binary, "-c", config_file]) |
| |
| step = 0 |
| while traci.simulation.getMinExpectedNumber() > 0: |
| traci.simulationStep() |
| |
| |
| if "vehicle_1" in traci.vehicle.getIDList(): |
| pos = traci.vehicle.getPosition("vehicle_1") |
| speed = traci.vehicle.getSpeed("vehicle_1") |
| results["vehicle_1_trajectory"].append(pos) |
| results["vehicle_1_speeds"].append(speed) |
| |
| if "vehicle_2" in traci.vehicle.getIDList(): |
| pos = traci.vehicle.getPosition("vehicle_2") |
| speed = traci.vehicle.getSpeed("vehicle_2") |
| results["vehicle_2_trajectory"].append(pos) |
| results["vehicle_2_speeds"].append(speed) |
| |
| |
| collisions = traci.simulation.getCollidingVehiclesIDList() |
| if collisions: |
| results["collision_detected"] = True |
| results["collision_time"] = step * SUMO_CONFIG["step_length"] |
| if results["vehicle_1_trajectory"]: |
| results["collision_position"] = results["vehicle_1_trajectory"][-1] |
| |
| step += 1 |
| |
| results["steps"] = step |
| traci.close() |
| |
| except Exception as e: |
| print(f"Simulation error: {e}") |
| if traci.isLoaded(): |
| traci.close() |
| |
| return results |
| |
| def _run_simulation_batch(self, config_file: str) -> Dict: |
| """Run simulation in batch mode without TraCI.""" |
| try: |
| result = subprocess.run( |
| ["sumo", "-c", config_file], |
| capture_output=True, |
| text=True |
| ) |
| |
| return { |
| "steps": SUMO_CONFIG["simulation_duration"] / SUMO_CONFIG["step_length"], |
| "collision_detected": "collision" in result.stdout.lower(), |
| "stdout": result.stdout, |
| "stderr": result.stderr |
| } |
| except FileNotFoundError: |
| print("SUMO not found. Please install SUMO.") |
| return {"error": "SUMO not installed"} |
|
|
|
|
| def create_simulation_for_scenario( |
| scenario: Dict, |
| vehicle_1: Dict, |
| vehicle_2: Dict, |
| scenario_id: int = 1 |
| ) -> Dict: |
| """ |
| Create and run a SUMO simulation for a specific accident scenario. |
| |
| Args: |
| scenario: Scenario dictionary from AI analysis |
| vehicle_1: Vehicle 1 data |
| vehicle_2: Vehicle 2 data |
| scenario_id: Unique identifier for this scenario |
| |
| Returns: |
| Simulation results dictionary |
| """ |
| simulator = SUMOSimulator() |
| |
| |
| network_path = simulator.create_simple_network(f"scenario_{scenario_id}") |
| |
| |
| direction_routes = { |
| "north": ["in_n", "r_n_e", "r_e_s", "out_s"], |
| "south": ["in_s", "r_s_w", "r_w_n", "out_n"], |
| "east": ["in_e", "r_e_s", "r_s_w", "out_w"], |
| "west": ["in_w", "r_w_n", "r_n_e", "out_e"] |
| } |
| |
| v1_direction = vehicle_1.get("direction", "north") |
| v2_direction = vehicle_2.get("direction", "east") |
| |
| v1_route = direction_routes.get(v1_direction, direction_routes["north"]) |
| v2_route = direction_routes.get(v2_direction, direction_routes["east"]) |
| |
| |
| route_path = simulator.create_route_file( |
| vehicle_1_route=v1_route, |
| vehicle_2_route=v2_route, |
| vehicle_1_speed=vehicle_1.get("speed", 50), |
| vehicle_2_speed=vehicle_2.get("speed", 50), |
| output_prefix=f"scenario_{scenario_id}" |
| ) |
| |
| |
| config_path = simulator.create_config_file( |
| network_file=network_path, |
| route_file=route_path, |
| output_prefix=f"scenario_{scenario_id}" |
| ) |
| |
| |
| results = simulator.run_simulation(config_path, gui=False) |
| |
| return results |
|
|
|
|
| if __name__ == "__main__": |
| |
| print("Testing SUMO simulation setup...") |
| |
| simulator = SUMOSimulator() |
| |
| |
| network = simulator.create_simple_network("test_network") |
| print(f"Network created: {network}") |
| |
| |
| routes = simulator.create_route_file( |
| vehicle_1_route=["in_n", "r_n_e", "r_e_s", "out_s"], |
| vehicle_2_route=["in_e", "r_e_s", "r_s_w", "out_w"], |
| vehicle_1_speed=50, |
| vehicle_2_speed=60, |
| output_prefix="test" |
| ) |
| |
| |
| config = simulator.create_config_file(network, routes, "test") |
| |
| print("\nSUMO setup complete!") |
| print(f"Network: {network}") |
| print(f"Routes: {routes}") |
| print(f"Config: {config}") |
|
|