| | """ |
| | SUMO Traffic Simulation Interface |
| | ================================== |
| | Handles integration with SUMO for 2D traffic simulation. |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import subprocess |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple |
| | import xml.etree.ElementTree as ET |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | from config import ( |
| | SUMO_CONFIG, |
| | SUMO_NETWORKS_DIR, |
| | SUMO_OUTPUT_DIR, |
| | CASE_STUDY_LOCATION |
| | ) |
| |
|
| | |
| | SUMO_HOME = os.environ.get("SUMO_HOME", "") |
| | if SUMO_HOME: |
| | sys.path.append(os.path.join(SUMO_HOME, "tools")) |
| |
|
| | try: |
| | import traci |
| | TRACI_AVAILABLE = True |
| | except ImportError: |
| | TRACI_AVAILABLE = False |
| | print("Warning: traci not available. SUMO simulation will be limited.") |
| |
|
| | try: |
| | import sumolib |
| | SUMOLIB_AVAILABLE = True |
| | except ImportError: |
| | SUMOLIB_AVAILABLE = False |
| | print("Warning: sumolib not available.") |
| |
|
| |
|
| | class SUMOSimulator: |
| | """ |
| | SUMO Traffic Simulator wrapper for accident reconstruction. |
| | """ |
| | |
| | def __init__(self, network_path: str = None): |
| | """ |
| | Initialize the SUMO simulator. |
| | |
| | Args: |
| | network_path: Path to SUMO network file (.net.xml) |
| | """ |
| | self.network_path = network_path |
| | self.simulation_running = False |
| | self.vehicles = {} |
| | self.collision_detected = False |
| | self.collision_data = None |
| | |
| | |
| | SUMO_NETWORKS_DIR.mkdir(parents=True, exist_ok=True) |
| | SUMO_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| | |
| | def create_network_from_osm( |
| | self, |
| | osm_file: str, |
| | output_prefix: str = "network" |
| | ) -> str: |
| | """ |
| | Convert OSM data to SUMO network format. |
| | |
| | Args: |
| | osm_file: Path to OSM file |
| | output_prefix: Prefix for output files |
| | |
| | Returns: |
| | Path to generated network file |
| | """ |
| | output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| | |
| | |
| | netconvert_cmd = [ |
| | "netconvert", |
| | "--osm-files", osm_file, |
| | "--output-file", str(output_net), |
| | "--geometry.remove", "true", |
| | "--junctions.join", "true", |
| | "--tls.guess", "true", |
| | "--roundabouts.guess", "true" |
| | ] |
| | |
| | try: |
| | subprocess.run(netconvert_cmd, check=True, capture_output=True) |
| | print(f"Network created: {output_net}") |
| | self.network_path = str(output_net) |
| | return str(output_net) |
| | except (subprocess.CalledProcessError, FileNotFoundError) as e: |
| | print(f"netconvert failed: {e}") |
| | |
| | return self.create_simple_network(output_prefix) |
| | |
| | def create_simple_network( |
| | self, |
| | output_prefix: str = "simple_network", |
| | location: Dict = None |
| | ) -> str: |
| | """ |
| | Create a simple SUMO network for a roundabout. |
| | |
| | Args: |
| | output_prefix: Prefix for output files |
| | location: Location dictionary with coordinates |
| | |
| | Returns: |
| | Path to generated network file |
| | """ |
| | if location is None: |
| | location = CASE_STUDY_LOCATION |
| | |
| | |
| | nodes_xml = self._create_nodes_xml(location) |
| | nodes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.nod.xml" |
| | with open(nodes_path, 'w') as f: |
| | f.write(nodes_xml) |
| | |
| | |
| | edges_xml = self._create_edges_xml() |
| | edges_path = SUMO_NETWORKS_DIR / f"{output_prefix}.edg.xml" |
| | with open(edges_path, 'w') as f: |
| | f.write(edges_xml) |
| | |
| | |
| | connections_xml = self._create_connections_xml() |
| | connections_path = SUMO_NETWORKS_DIR / f"{output_prefix}.con.xml" |
| | with open(connections_path, 'w') as f: |
| | f.write(connections_xml) |
| | |
| | |
| | output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| | |
| | try: |
| | netconvert_cmd = [ |
| | "netconvert", |
| | "--node-files", str(nodes_path), |
| | "--edge-files", str(edges_path), |
| | "--connection-files", str(connections_path), |
| | "--output-file", str(output_net) |
| | ] |
| | subprocess.run(netconvert_cmd, check=True, capture_output=True) |
| | self.network_path = str(output_net) |
| | return str(output_net) |
| | except (subprocess.CalledProcessError, FileNotFoundError): |
| | |
| | return self._create_minimal_network_xml(output_prefix) |
| | |
| | def _create_nodes_xml(self, location: Dict) -> str: |
| | """Create SUMO nodes XML for a roundabout.""" |
| | lat = location.get("latitude", 26.2285) |
| | lng = location.get("longitude", 50.5818) |
| | |
| | |
| | |
| | scale = 111000 |
| | |
| | nodes = f'''<?xml version="1.0" encoding="UTF-8"?> |
| | <nodes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/nodes_file.xsd"> |
| | <!-- Roundabout center --> |
| | <node id="center" x="0" y="0" type="priority"/> |
| | |
| | <!-- Roundabout nodes --> |
| | <node id="r_n" x="0" y="30" type="priority"/> |
| | <node id="r_e" x="30" y="0" type="priority"/> |
| | <node id="r_s" x="0" y="-30" type="priority"/> |
| | <node id="r_w" x="-30" y="0" type="priority"/> |
| | |
| | <!-- Approach nodes --> |
| | <node id="a_n" x="0" y="150" type="priority"/> |
| | <node id="a_e" x="150" y="0" type="priority"/> |
| | <node id="a_s" x="0" y="-150" type="priority"/> |
| | <node id="a_w" x="-150" y="0" type="priority"/> |
| | </nodes>''' |
| | return nodes |
| | |
| | def _create_edges_xml(self) -> str: |
| | """Create SUMO edges XML for a roundabout.""" |
| | edges = '''<?xml version="1.0" encoding="UTF-8"?> |
| | <edges xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/edges_file.xsd"> |
| | <!-- Roundabout edges (clockwise) --> |
| | <edge id="r_n_e" from="r_n" to="r_e" numLanes="2" speed="8.33"/> |
| | <edge id="r_e_s" from="r_e" to="r_s" numLanes="2" speed="8.33"/> |
| | <edge id="r_s_w" from="r_s" to="r_w" numLanes="2" speed="8.33"/> |
| | <edge id="r_w_n" from="r_w" to="r_n" numLanes="2" speed="8.33"/> |
| | |
| | <!-- Approach roads (incoming) --> |
| | <edge id="in_n" from="a_n" to="r_n" numLanes="2" speed="13.89"/> |
| | <edge id="in_e" from="a_e" to="r_e" numLanes="2" speed="13.89"/> |
| | <edge id="in_s" from="a_s" to="r_s" numLanes="2" speed="13.89"/> |
| | <edge id="in_w" from="a_w" to="r_w" numLanes="2" speed="13.89"/> |
| | |
| | <!-- Exit roads (outgoing) --> |
| | <edge id="out_n" from="r_n" to="a_n" numLanes="2" speed="13.89"/> |
| | <edge id="out_e" from="r_e" to="a_e" numLanes="2" speed="13.89"/> |
| | <edge id="out_s" from="r_s" to="a_s" numLanes="2" speed="13.89"/> |
| | <edge id="out_w" from="r_w" to="a_w" numLanes="2" speed="13.89"/> |
| | </edges>''' |
| | return edges |
| | |
| | def _create_connections_xml(self) -> str: |
| | """Create SUMO connections XML.""" |
| | connections = '''<?xml version="1.0" encoding="UTF-8"?> |
| | <connections xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/connections_file.xsd"> |
| | <!-- Entry to roundabout connections --> |
| | <connection from="in_n" to="r_n_e"/> |
| | <connection from="in_e" to="r_e_s"/> |
| | <connection from="in_s" to="r_s_w"/> |
| | <connection from="in_w" to="r_w_n"/> |
| | |
| | <!-- Roundabout circulation --> |
| | <connection from="r_n_e" to="r_e_s"/> |
| | <connection from="r_e_s" to="r_s_w"/> |
| | <connection from="r_s_w" to="r_w_n"/> |
| | <connection from="r_w_n" to="r_n_e"/> |
| | |
| | <!-- Exit from roundabout --> |
| | <connection from="r_n_e" to="out_e"/> |
| | <connection from="r_e_s" to="out_s"/> |
| | <connection from="r_s_w" to="out_w"/> |
| | <connection from="r_w_n" to="out_n"/> |
| | </connections>''' |
| | return connections |
| | |
| | def _create_minimal_network_xml(self, output_prefix: str) -> str: |
| | """Create a minimal network XML directly (fallback).""" |
| | network_xml = '''<?xml version="1.0" encoding="UTF-8"?> |
| | <net version="1.9" junctionCornerDetail="5" limitTurnSpeed="5.50" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/net_file.xsd"> |
| | |
| | <location netOffset="0.00,0.00" convBoundary="-150,-150,150,150" origBoundary="-150,-150,150,150" projParameter="!"/> |
| | |
| | <edge id="in_n" from="a_n" to="r_n" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="in_e" from="a_e" to="r_e" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="in_s" from="a_s" to="r_s" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="in_w" from="a_w" to="r_w" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | |
| | <edge id="out_n" from="r_n" to="a_n" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="out_e" from="r_e" to="a_e" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="out_s" from="r_s" to="a_s" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | <edge id="out_w" from="r_w" to="a_w" priority="1" numLanes="2" speed="13.89" length="120"/> |
| | |
| | <edge id="r_n_e" from="r_n" to="r_e" priority="2" numLanes="2" speed="8.33" length="47"/> |
| | <edge id="r_e_s" from="r_e" to="r_s" priority="2" numLanes="2" speed="8.33" length="47"/> |
| | <edge id="r_s_w" from="r_s" to="r_w" priority="2" numLanes="2" speed="8.33" length="47"/> |
| | <edge id="r_w_n" from="r_w" to="r_n" priority="2" numLanes="2" speed="8.33" length="47"/> |
| | |
| | <junction id="a_n" type="dead_end" x="0.00" y="150.00"/> |
| | <junction id="a_e" type="dead_end" x="150.00" y="0.00"/> |
| | <junction id="a_s" type="dead_end" x="0.00" y="-150.00"/> |
| | <junction id="a_w" type="dead_end" x="-150.00" y="0.00"/> |
| | |
| | <junction id="r_n" type="priority" x="0.00" y="30.00"/> |
| | <junction id="r_e" type="priority" x="30.00" y="0.00"/> |
| | <junction id="r_s" type="priority" x="0.00" y="-30.00"/> |
| | <junction id="r_w" type="priority" x="-30.00" y="0.00"/> |
| | |
| | </net>''' |
| | |
| | output_net = SUMO_NETWORKS_DIR / f"{output_prefix}.net.xml" |
| | with open(output_net, 'w') as f: |
| | f.write(network_xml) |
| | |
| | self.network_path = str(output_net) |
| | print(f"Created minimal network: {output_net}") |
| | return str(output_net) |
| | |
| | def create_route_file( |
| | self, |
| | vehicle_1_route: List[str], |
| | vehicle_2_route: List[str], |
| | vehicle_1_speed: float = 50, |
| | vehicle_2_speed: float = 50, |
| | output_prefix: str = "routes" |
| | ) -> str: |
| | """ |
| | Create a SUMO route file for two vehicles. |
| | |
| | Args: |
| | vehicle_1_route: List of edge IDs for vehicle 1 |
| | vehicle_2_route: List of edge IDs for vehicle 2 |
| | vehicle_1_speed: Speed of vehicle 1 in km/h |
| | vehicle_2_speed: Speed of vehicle 2 in km/h |
| | output_prefix: Prefix for output file |
| | |
| | Returns: |
| | Path to route file |
| | """ |
| | |
| | v1_speed_ms = vehicle_1_speed / 3.6 |
| | v2_speed_ms = vehicle_2_speed / 3.6 |
| | |
| | routes_xml = f'''<?xml version="1.0" encoding="UTF-8"?> |
| | <routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/routes_file.xsd"> |
| | |
| | <!-- Vehicle types --> |
| | <vType id="car1" accel="2.6" decel="4.5" sigma="0.5" length="4.5" maxSpeed="{v1_speed_ms}" color="1,0,0"/> |
| | <vType id="car2" accel="2.6" decel="4.5" sigma="0.5" length="4.5" maxSpeed="{v2_speed_ms}" color="0,0,1"/> |
| | |
| | <!-- Routes --> |
| | <route id="route1" edges="{' '.join(vehicle_1_route)}"/> |
| | <route id="route2" edges="{' '.join(vehicle_2_route)}"/> |
| | |
| | <!-- Vehicles --> |
| | <vehicle id="vehicle_1" type="car1" route="route1" depart="0" departSpeed="{v1_speed_ms}"/> |
| | <vehicle id="vehicle_2" type="car2" route="route2" depart="0" departSpeed="{v2_speed_ms}"/> |
| | |
| | </routes>''' |
| | |
| | routes_path = SUMO_NETWORKS_DIR / f"{output_prefix}.rou.xml" |
| | with open(routes_path, 'w') as f: |
| | f.write(routes_xml) |
| | |
| | print(f"Routes file created: {routes_path}") |
| | return str(routes_path) |
| | |
| | def create_config_file( |
| | self, |
| | network_file: str, |
| | route_file: str, |
| | output_prefix: str = "simulation" |
| | ) -> str: |
| | """ |
| | Create a SUMO configuration file. |
| | |
| | Args: |
| | network_file: Path to network file |
| | route_file: Path to route file |
| | output_prefix: Prefix for output files |
| | |
| | Returns: |
| | Path to configuration file |
| | """ |
| | config_xml = f'''<?xml version="1.0" encoding="UTF-8"?> |
| | <configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/sumoConfiguration.xsd"> |
| | |
| | <input> |
| | <net-file value="{network_file}"/> |
| | <route-files value="{route_file}"/> |
| | </input> |
| | |
| | <time> |
| | <begin value="0"/> |
| | <end value="{SUMO_CONFIG['simulation_duration']}"/> |
| | <step-length value="{SUMO_CONFIG['step_length']}"/> |
| | </time> |
| | |
| | <output> |
| | <tripinfo-output value="{SUMO_OUTPUT_DIR}/{output_prefix}_tripinfo.xml"/> |
| | <collision-output value="{SUMO_OUTPUT_DIR}/{output_prefix}_collisions.xml"/> |
| | </output> |
| | |
| | <processing> |
| | <collision.action value="{SUMO_CONFIG['collision_action']}"/> |
| | <collision.check-junctions value="true"/> |
| | </processing> |
| | |
| | <random> |
| | <seed value="{SUMO_CONFIG['random_seed']}"/> |
| | </random> |
| | |
| | </configuration>''' |
| | |
| | config_path = SUMO_NETWORKS_DIR / f"{output_prefix}.sumocfg" |
| | with open(config_path, 'w') as f: |
| | f.write(config_xml) |
| | |
| | print(f"Configuration file created: {config_path}") |
| | return str(config_path) |
| | |
| | def run_simulation( |
| | self, |
| | config_file: str, |
| | gui: bool = False |
| | ) -> Dict: |
| | """ |
| | Run a SUMO simulation. |
| | |
| | Args: |
| | config_file: Path to SUMO configuration file |
| | gui: Whether to run with GUI |
| | |
| | Returns: |
| | Dictionary containing simulation results |
| | """ |
| | if not TRACI_AVAILABLE: |
| | print("TraCI not available. Running simulation without real-time control.") |
| | return self._run_simulation_batch(config_file) |
| | |
| | results = { |
| | "steps": 0, |
| | "collision_detected": False, |
| | "collision_time": None, |
| | "collision_position": None, |
| | "vehicle_1_trajectory": [], |
| | "vehicle_2_trajectory": [], |
| | "vehicle_1_speeds": [], |
| | "vehicle_2_speeds": [] |
| | } |
| | |
| | try: |
| | |
| | sumo_binary = "sumo-gui" if gui else "sumo" |
| | traci.start([sumo_binary, "-c", config_file]) |
| | |
| | step = 0 |
| | while traci.simulation.getMinExpectedNumber() > 0: |
| | traci.simulationStep() |
| | |
| | |
| | if "vehicle_1" in traci.vehicle.getIDList(): |
| | pos = traci.vehicle.getPosition("vehicle_1") |
| | speed = traci.vehicle.getSpeed("vehicle_1") |
| | results["vehicle_1_trajectory"].append(pos) |
| | results["vehicle_1_speeds"].append(speed) |
| | |
| | if "vehicle_2" in traci.vehicle.getIDList(): |
| | pos = traci.vehicle.getPosition("vehicle_2") |
| | speed = traci.vehicle.getSpeed("vehicle_2") |
| | results["vehicle_2_trajectory"].append(pos) |
| | results["vehicle_2_speeds"].append(speed) |
| | |
| | |
| | collisions = traci.simulation.getCollidingVehiclesIDList() |
| | if collisions: |
| | results["collision_detected"] = True |
| | results["collision_time"] = step * SUMO_CONFIG["step_length"] |
| | if results["vehicle_1_trajectory"]: |
| | results["collision_position"] = results["vehicle_1_trajectory"][-1] |
| | |
| | step += 1 |
| | |
| | results["steps"] = step |
| | traci.close() |
| | |
| | except Exception as e: |
| | print(f"Simulation error: {e}") |
| | if traci.isLoaded(): |
| | traci.close() |
| | |
| | return results |
| | |
| | def _run_simulation_batch(self, config_file: str) -> Dict: |
| | """Run simulation in batch mode without TraCI.""" |
| | try: |
| | result = subprocess.run( |
| | ["sumo", "-c", config_file], |
| | capture_output=True, |
| | text=True |
| | ) |
| | |
| | return { |
| | "steps": SUMO_CONFIG["simulation_duration"] / SUMO_CONFIG["step_length"], |
| | "collision_detected": "collision" in result.stdout.lower(), |
| | "stdout": result.stdout, |
| | "stderr": result.stderr |
| | } |
| | except FileNotFoundError: |
| | print("SUMO not found. Please install SUMO.") |
| | return {"error": "SUMO not installed"} |
| |
|
| |
|
| | def create_simulation_for_scenario( |
| | scenario: Dict, |
| | vehicle_1: Dict, |
| | vehicle_2: Dict, |
| | scenario_id: int = 1 |
| | ) -> Dict: |
| | """ |
| | Create and run a SUMO simulation for a specific accident scenario. |
| | |
| | Args: |
| | scenario: Scenario dictionary from AI analysis |
| | vehicle_1: Vehicle 1 data |
| | vehicle_2: Vehicle 2 data |
| | scenario_id: Unique identifier for this scenario |
| | |
| | Returns: |
| | Simulation results dictionary |
| | """ |
| | simulator = SUMOSimulator() |
| | |
| | |
| | network_path = simulator.create_simple_network(f"scenario_{scenario_id}") |
| | |
| | |
| | direction_routes = { |
| | "north": ["in_n", "r_n_e", "r_e_s", "out_s"], |
| | "south": ["in_s", "r_s_w", "r_w_n", "out_n"], |
| | "east": ["in_e", "r_e_s", "r_s_w", "out_w"], |
| | "west": ["in_w", "r_w_n", "r_n_e", "out_e"] |
| | } |
| | |
| | v1_direction = vehicle_1.get("direction", "north") |
| | v2_direction = vehicle_2.get("direction", "east") |
| | |
| | v1_route = direction_routes.get(v1_direction, direction_routes["north"]) |
| | v2_route = direction_routes.get(v2_direction, direction_routes["east"]) |
| | |
| | |
| | route_path = simulator.create_route_file( |
| | vehicle_1_route=v1_route, |
| | vehicle_2_route=v2_route, |
| | vehicle_1_speed=vehicle_1.get("speed", 50), |
| | vehicle_2_speed=vehicle_2.get("speed", 50), |
| | output_prefix=f"scenario_{scenario_id}" |
| | ) |
| | |
| | |
| | config_path = simulator.create_config_file( |
| | network_file=network_path, |
| | route_file=route_path, |
| | output_prefix=f"scenario_{scenario_id}" |
| | ) |
| | |
| | |
| | results = simulator.run_simulation(config_path, gui=False) |
| | |
| | return results |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | print("Testing SUMO simulation setup...") |
| | |
| | simulator = SUMOSimulator() |
| | |
| | |
| | network = simulator.create_simple_network("test_network") |
| | print(f"Network created: {network}") |
| | |
| | |
| | routes = simulator.create_route_file( |
| | vehicle_1_route=["in_n", "r_n_e", "r_e_s", "out_s"], |
| | vehicle_2_route=["in_e", "r_e_s", "r_s_w", "out_w"], |
| | vehicle_1_speed=50, |
| | vehicle_2_speed=60, |
| | output_prefix="test" |
| | ) |
| | |
| | |
| | config = simulator.create_config_file(network, routes, "test") |
| | |
| | print("\nSUMO setup complete!") |
| | print(f"Network: {network}") |
| | print(f"Routes: {routes}") |
| | print(f"Config: {config}") |
| |
|