Spaces:
Running
Running
| import gradio as gr | |
| import time | |
| import random | |
| # Vehicle state for dynamic responses | |
| class VehicleState: | |
| def __init__(self): | |
| self.engine_running = True | |
| self.rpm = 800 # Idle RPM | |
| self.speed = 0 | |
| self.coolant_temp = 20 # °C | |
| self.throttle_pos = 0 | |
| self.maf_rate = 250 # 2.5 g/s idle (scaled by 100) | |
| self.fuel_level = 75 # 75% | |
| self.update_counter = 0 | |
| # ELM327 settings | |
| self.echo_on = True | |
| self.linefeed_on = True | |
| self.headers_on = False | |
| self.spaces_on = True | |
| self.protocol = "AUTO" | |
| self.voltage = 13.05 | |
| def update(self): | |
| """Update vehicle state with realistic variations""" | |
| self.update_counter += 1 | |
| if not self.engine_running: | |
| return | |
| # RPM varies slightly at idle | |
| self.rpm = 800 + ((self.update_counter * 7) % 100) - 50 | |
| # Coolant temp gradually increases when engine running | |
| if self.coolant_temp < 90: | |
| if self.update_counter % 10 == 0: | |
| self.coolant_temp += 1 | |
| # Simulate some throttle/speed variation | |
| if self.update_counter % 50 == 0: | |
| self.throttle_pos = (self.update_counter // 5) % 30 | |
| self.speed = self.throttle_pos * 2 | |
| # MAF follows throttle roughly | |
| self.maf_rate = 250 + (self.throttle_pos * 5) | |
| # Global vehicle state | |
| vehicle_state = VehicleState() | |
| # Pool of possible DTCs for testing | |
| # Format: (code_bytes, description) | |
| DTC_POOL = [ | |
| ("01 71", "P0171 - System Too Lean (Bank 1)"), | |
| ("03 00", "P0300 - Random/Multiple Cylinder Misfire Detected"), | |
| ("04 42", "P0442 - EVAP Emission Control System Leak Detected (small leak)"), | |
| ("01 31", "P0131 - O2 Sensor Circuit Low Voltage (Bank 1, Sensor 1)"), | |
| ("01 33", "P0133 - O2 Sensor Circuit Slow Response (Bank 1, Sensor 1)"), | |
| ("01 71", "P0171 - System Too Lean (Bank 1)"), | |
| ("01 72", "P0172 - System Too Rich (Bank 1)"), | |
| ("02 02", "P0202 - Injector Circuit Malfunction - Cylinder 2"), | |
| ("03 20", "P0320 - Ignition/Distributor Engine Speed Input Circuit Malfunction"), | |
| ("04 20", "P0420 - Catalyst System Efficiency Below Threshold (Bank 1)"), | |
| ("05 00", "P0500 - Vehicle Speed Sensor Malfunction"), | |
| ("07 05", "P0705 - Transmission Range Sensor Circuit Malfunction (PRNDL Input)"), | |
| ("13 00", "P1300 - Igniter Circuit Malfunction"), | |
| ("00 16", "P0016 - Crankshaft Position/Camshaft Position Correlation (Bank 1)"), | |
| ("01 28", "P0128 - Coolant Thermostat (Coolant Temp Below Thermostat Regulating Temp)"), | |
| ] | |
| # Mode 01 PID table (static responses) | |
| MODE01_PID_TABLE = { | |
| "00": "41 00 BE 3F B8 13", # Supported PIDs 01-20 | |
| "01": "41 01 83 07 65 04", # Monitor status (MIL on, 3 DTCs) | |
| "03": "41 03 02 00", # Fuel system status (closed loop) | |
| "06": "41 06 80", # Short term fuel trim | |
| "07": "41 07 80", # Long term fuel trim | |
| "0A": "41 0A B4", # Fuel pressure (540 kPa) | |
| "0B": "41 0B 63", # Intake manifold pressure (99 kPa) | |
| "0E": "41 0E 7C", # Timing advance (14 degrees) | |
| "0F": "41 0F 54", # Intake air temperature (44°C) | |
| "13": "41 13 03", # O2 sensors present | |
| "1C": "41 1C 01", # OBD standard (OBD-II California ARB) | |
| "1F": "41 1F 00 8C", # Run time since engine start (140s) | |
| "20": "41 20 80 01 80 01", # Supported PIDs 21-40 | |
| "21": "41 21 00 4B", # Distance with MIL on (75 km) | |
| "33": "41 33 65", # Barometric pressure (101 kPa) | |
| "40": "41 40 40 00 00 00", # Supported PIDs 41-60 | |
| "42": "41 42 32 E8", # Control module voltage (13.05V) | |
| "51": "41 51 01", # Fuel Type (Gasoline) | |
| } | |
| # Dynamic PIDs (generated from vehicle state) | |
| DYNAMIC_PIDS = ["04", "05", "0C", "0D", "10", "11", "2F"] | |
| def normalize_command(cmd): | |
| """Normalize command string (remove spaces, convert to uppercase)""" | |
| return ''.join(cmd.split()).upper() | |
| def format_response(response, add_prompt=True): | |
| """Format response with ELM327 settings (echo, spaces, prompt)""" | |
| result = "" | |
| if vehicle_state.echo_on: | |
| result += response + "\n" | |
| if vehicle_state.linefeed_on: | |
| result += "\n" | |
| if add_prompt: | |
| result += ">" | |
| return result | |
| def generate_rpm_response(): | |
| """Generate dynamic RPM response (PID 0C)""" | |
| rpm_value = vehicle_state.rpm | |
| encoded = rpm_value * 4 | |
| return f"41 0C {(encoded >> 8) & 0xFF:02X} {encoded & 0xFF:02X}" | |
| def generate_speed_response(): | |
| """Generate dynamic speed response (PID 0D)""" | |
| return f"41 0D {vehicle_state.speed:02X}" | |
| def generate_coolant_temp_response(): | |
| """Generate dynamic coolant temperature response (PID 05)""" | |
| encoded = vehicle_state.coolant_temp + 40 | |
| return f"41 05 {encoded:02X}" | |
| def generate_engine_load_response(): | |
| """Generate dynamic engine load response (PID 04)""" | |
| if vehicle_state.engine_running: | |
| load = (vehicle_state.throttle_pos * 2 + vehicle_state.rpm // 100) // 3 | |
| load = min(load, 100) | |
| encoded = (load * 255) // 100 | |
| else: | |
| encoded = 0 | |
| return f"41 04 {encoded:02X}" | |
| def generate_maf_response(): | |
| """Generate dynamic MAF response (PID 10)""" | |
| maf = vehicle_state.maf_rate | |
| return f"41 10 {(maf >> 8) & 0xFF:02X} {maf & 0xFF:02X}" | |
| def generate_throttle_response(): | |
| """Generate dynamic throttle position response (PID 11)""" | |
| encoded = (vehicle_state.throttle_pos * 255) // 100 | |
| return f"41 11 {encoded:02X}" | |
| def generate_fuel_level_response(): | |
| """Generate dynamic fuel level response (PID 2F)""" | |
| encoded = (vehicle_state.fuel_level * 255) // 100 | |
| return f"41 2F {encoded:02X}" | |
| def handle_mode01_pid(pid): | |
| """Handle Mode 01 PID requests""" | |
| # Check if it's a dynamic PID | |
| if pid in DYNAMIC_PIDS: | |
| if pid == "0C": | |
| return generate_rpm_response() | |
| elif pid == "0D": | |
| return generate_speed_response() | |
| elif pid == "05": | |
| return generate_coolant_temp_response() | |
| elif pid == "04": | |
| return generate_engine_load_response() | |
| elif pid == "10": | |
| return generate_maf_response() | |
| elif pid == "11": | |
| return generate_throttle_response() | |
| elif pid == "2F": | |
| return generate_fuel_level_response() | |
| # Check static PID table | |
| if pid in MODE01_PID_TABLE: | |
| return MODE01_PID_TABLE[pid] | |
| return "NO DATA" | |
| def generate_vin_response(): | |
| """Generate Mode 09 PID 02 response (VIN)""" | |
| vin = "5TDKRKEC7PS142916" | |
| response = "49 02 01" | |
| for char in vin: | |
| response += f" {ord(char):02X}" | |
| return response | |
| def generate_calibration_id_response(): | |
| """Generate Mode 09 PID 04 response (Calibration ID)""" | |
| cal_id = "CAL123456" | |
| response = "49 04 01" | |
| for char in cal_id: | |
| response += f" {ord(char):02X}" | |
| return response | |
| def generate_ecu_name_response(): | |
| """Generate Mode 09 PID 0A response (ECU Name)""" | |
| ecu_name = "ECU_SIM_UNIT" | |
| response = "49 0A 01" | |
| for char in ecu_name: | |
| response += f" {ord(char):02X}" | |
| return response | |
| def handle_at_command(cmd): | |
| """Handle AT commands""" | |
| global vehicle_state | |
| cmd_upper = cmd.upper() | |
| # ATZ - Reset | |
| if cmd_upper == "ATZ": | |
| vehicle_state = VehicleState() | |
| return "ELM327 v1.5\n\n>" | |
| # AT@1 - Device description | |
| if cmd_upper == "AT@1": | |
| return "OBDSIM ELM327\n\n>" | |
| # ATI - Version ID | |
| if cmd_upper == "ATI": | |
| return "ELM327 v1.5\n\n>" | |
| # ATE0 - Echo off | |
| if cmd_upper == "ATE0": | |
| vehicle_state.echo_on = False | |
| return "OK\n\n>" | |
| # ATE1 - Echo on | |
| if cmd_upper == "ATE1": | |
| vehicle_state.echo_on = True | |
| return "OK\n\n>" | |
| # ATL0 - Linefeed off | |
| if cmd_upper == "ATL0": | |
| vehicle_state.linefeed_on = False | |
| return "OK\n\n>" | |
| # ATL1 - Linefeed on | |
| if cmd_upper == "ATL1": | |
| vehicle_state.linefeed_on = True | |
| return "OK\n\n>" | |
| # ATH0 - Headers off | |
| if cmd_upper == "ATH0": | |
| vehicle_state.headers_on = False | |
| return "OK\n\n>" | |
| # ATH1 - Headers on | |
| if cmd_upper == "ATH1": | |
| vehicle_state.headers_on = True | |
| return "OK\n\n>" | |
| # ATS0 - Spaces off | |
| if cmd_upper == "ATS0": | |
| vehicle_state.spaces_on = False | |
| return "OK\n\n>" | |
| # ATS1 - Spaces on | |
| if cmd_upper == "ATS1": | |
| vehicle_state.spaces_on = True | |
| return "OK\n\n>" | |
| # ATSP - Set Protocol | |
| if cmd_upper.startswith("ATSP"): | |
| protocol_num = cmd_upper[4:] if len(cmd_upper) > 4 else "0" | |
| protocols = { | |
| "0": "AUTO", | |
| "1": "SAE J1850 PWM", | |
| "2": "SAE J1850 VPW", | |
| "3": "ISO 9141-2", | |
| "4": "ISO 14230-4 KWP", | |
| "5": "ISO 14230-4 KWP (fast)", | |
| "6": "ISO 15765-4 CAN (11 bit, 500 kbaud)", | |
| "7": "ISO 15765-4 CAN (29 bit, 500 kbaud)", | |
| "8": "ISO 15765-4 CAN (11 bit, 250 kbaud)", | |
| "9": "ISO 15765-4 CAN (29 bit, 250 kbaud)", | |
| "A": "SAE J1939 CAN", | |
| } | |
| vehicle_state.protocol = protocols.get(protocol_num, "AUTO") | |
| return "OK\n\n>" | |
| # ATDP - Describe Protocol | |
| if cmd_upper == "ATDP": | |
| return f"{vehicle_state.protocol}\n\n>" | |
| # ATDPN - Describe Protocol by Number | |
| if cmd_upper == "ATDPN": | |
| return "6\n\n>" # ISO 15765-4 CAN (11 bit, 500 kbaud) | |
| # ATRV - Read Voltage | |
| if cmd_upper == "ATRV": | |
| return f"{vehicle_state.voltage:.1f}V\n\n>" | |
| # ATWS - Warm Start | |
| if cmd_upper == "ATWS": | |
| return "ELM327 v1.5\n\n>" | |
| # ATD - Set to Defaults | |
| if cmd_upper == "ATD": | |
| vehicle_state = VehicleState() | |
| return "OK\n\n>" | |
| # ATAT - Adaptive Timing | |
| if cmd_upper.startswith("ATAT"): | |
| return "OK\n\n>" | |
| # ATST - Set Timeout | |
| if cmd_upper.startswith("ATST"): | |
| return "OK\n\n>" | |
| # ATMA - Monitor All | |
| if cmd_upper == "ATMA": | |
| return "MONITORING...\n(Press any key to stop)\n\n>" | |
| # ATPC - Protocol Close | |
| if cmd_upper == "ATPC": | |
| return "OK\n\n>" | |
| # Default response for unknown AT commands | |
| return "?\n\n>" | |
| def handle_obd_command(normalized): | |
| """Handle OBD-II commands""" | |
| # Update vehicle state | |
| vehicle_state.update() | |
| # Mode 03 - Read DTCs | |
| if normalized == "03": | |
| # Randomly select 1-5 DTCs from the pool | |
| num_dtcs = random.randint(1, 5) | |
| selected_dtcs = random.sample(DTC_POOL, min(num_dtcs, len(DTC_POOL))) | |
| # Build response: 43 [DTC codes...] | |
| response = "43" | |
| for dtc_code, _ in selected_dtcs: | |
| response += " " + dtc_code | |
| return response + "\n\n>" | |
| # Mode 07 - Read pending DTCs | |
| if normalized == "07": | |
| # Randomly select 0-2 pending DTCs from the pool | |
| num_pending = random.randint(0, 2) | |
| if num_pending == 0: | |
| # No pending DTCs | |
| response = "47" | |
| else: | |
| selected_pending = random.sample(DTC_POOL, min(num_pending, len(DTC_POOL))) | |
| response = "47" | |
| for dtc_code, _ in selected_pending: | |
| response += " " + dtc_code | |
| return response + "\n\n>" | |
| # Mode 04 - Clear DTCs | |
| if normalized == "04": | |
| return "44\n\n>" | |
| # Mode 09 - Vehicle Information | |
| if len(normalized) >= 4 and normalized[:2] == "09": | |
| mode09_pid = normalized[2:4] | |
| if mode09_pid == "00": | |
| # Supported PIDs | |
| response = "49 00 54 40 00 00" | |
| elif mode09_pid == "02": | |
| # VIN | |
| response = generate_vin_response() | |
| elif mode09_pid == "04": | |
| # Calibration ID | |
| response = generate_calibration_id_response() | |
| elif mode09_pid == "0A": | |
| # ECU Name | |
| response = generate_ecu_name_response() | |
| else: | |
| response = "NO DATA" | |
| return response + "\n\n>" | |
| # Mode 01 - Show current data | |
| if len(normalized) >= 4 and normalized[:2] == "01": | |
| pid = normalized[2:4] | |
| response = handle_mode01_pid(pid) | |
| return response + "\n\n>" | |
| return "NO DATA\n\n>" | |
| def send_elm327_command(command): | |
| """ | |
| Send a command to the ELM327 OBD-II adapter and get the response. | |
| Args: | |
| command (str): ELM327 command to send (e.g., 'ATZ', '01 0D') | |
| Returns: | |
| str: The response from the ELM327 adapter | |
| """ | |
| if not command or not command.strip(): | |
| return ">" | |
| # Normalize command | |
| normalized = normalize_command(command.strip()) | |
| # Handle AT commands | |
| if normalized.startswith("AT"): | |
| return handle_at_command(normalized) | |
| # Handle OBD commands | |
| return handle_obd_command(normalized) | |
| def get_system_status(): | |
| """ | |
| Get system status including IP address, network status, uptime, and memory. | |
| Returns: | |
| str: System status information | |
| """ | |
| import json | |
| import socket | |
| import psutil | |
| import platform | |
| # Get IP address | |
| try: | |
| # Get the default route interface IP | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| ip_address = s.getsockname()[0] | |
| s.close() | |
| except Exception: | |
| ip_address = "127.0.0.1" | |
| # Get uptime in seconds | |
| uptime_seconds = int(time.time() - psutil.boot_time()) | |
| # Get free memory in bytes | |
| memory_info = psutil.virtual_memory() | |
| free_memory_bytes = memory_info.available | |
| # Simulate WiFi RSSI (for simulation, use a random value between -40 and -70 dBm) | |
| # In real hardware this would come from the WiFi driver | |
| import random | |
| wifi_rssi_dbm = random.randint(-70, -40) | |
| # Build JSON response matching W600 MCP format | |
| status_data = { | |
| "ip_address": ip_address, | |
| "uptime_seconds": uptime_seconds, | |
| "free_memory_bytes": free_memory_bytes, | |
| "wifi_rssi_dbm": wifi_rssi_dbm, | |
| "elm327_status": "Simulated ELM327 on Gradio" | |
| } | |
| # Return formatted JSON string | |
| return json.dumps(status_data, indent=2) | |
| def get_elm327_history(count): | |
| """ | |
| Get historical log of OBD-II data (RPM, speed, coolant temp) with streaming support. | |
| Returns the last N records. | |
| Args: | |
| count (int): Number of most recent records to retrieve (default: 20, max: 20) | |
| Returns: | |
| str: Historical OBD-II data or error message | |
| """ | |
| import json | |
| if count is None or count == "": | |
| count = 20 | |
| else: | |
| count = int(count) | |
| # Enforce maximum limit of 20 records | |
| if count > 20: | |
| return json.dumps({ | |
| "error": "Request exceeds maximum limit", | |
| "message": f"Requested {count} records, but maximum allowed is 20 records. Please request 20 or fewer records.", | |
| "requested": count, | |
| "max_allowed": 20 | |
| }, indent=2) | |
| # Generate simulated test data | |
| # Simulate a scenario where the vehicle has been running for a while | |
| history_records = [] | |
| # Base timestamp in seconds (simulating uptime) | |
| base_time = 0 # Start from 0 seconds | |
| for i in range(count): | |
| # Simulate realistic driving conditions with more variations | |
| import math | |
| # RPM varies between idle (800) and higher driving (up to 4500) | |
| # Create multiple variation patterns for more realistic data | |
| cycle_position = (i % 50) / 50.0 | |
| sine_variation = math.sin(i * 0.1) * 400 # Sine wave variation | |
| random_noise = ((i * 7) % 300) - 150 # Random-like noise | |
| base_rpm = 800 + 2700 * cycle_position | |
| rpm = int(base_rpm + sine_variation + random_noise) | |
| rpm = max(700, min(rpm, 5000)) # Clamp to realistic range | |
| # Speed follows RPM with more variation (0-120 km/h) | |
| # Add acceleration/deceleration patterns | |
| speed_factor = (rpm - 800) / 35 # Convert RPM to rough speed | |
| speed_variation = math.cos(i * 0.15) * 15 # Speed variations | |
| speed = int(speed_factor + speed_variation + ((i * 3) % 20) - 10) | |
| speed = max(0, min(speed, 130)) # Clamp to 0-130 km/h | |
| # Coolant temp starts cold and warms up, then stabilizes | |
| if i < 20: | |
| coolant_temp = 20 + i * 3 # Warming up | |
| elif i < 40: | |
| coolant_temp = 60 + (i - 20) * 1 # Still warming | |
| else: | |
| coolant_temp = 80 + ((i * 5) % 15) # Stable operating temp with variation | |
| coolant_temp = min(coolant_temp, 95) # Cap at 95°C | |
| # Create record matching W600 MCP format | |
| # Time in seconds (2 second intervals) | |
| record = { | |
| "seq": i, | |
| "time": base_time + (i * 2), # Time in seconds, 2 seconds between samples | |
| "rpm": rpm, | |
| "speed": speed, | |
| "coolant_temp": coolant_temp | |
| } | |
| history_records.append(record) | |
| # Format as JSON array (matching MCP format) | |
| result = json.dumps(history_records) | |
| return result | |
| # Interface for ELM327 commands | |
| elm327_interface = gr.Interface( | |
| fn=send_elm327_command, | |
| inputs=gr.Textbox(label="Command", placeholder="e.g., ATZ, 01 0D"), | |
| outputs=gr.Textbox(label="Response"), | |
| title="ELM327 Commands", | |
| description="Send commands to the ELM327 OBD-II adapter" | |
| ) | |
| # Interface for system status | |
| status_interface = gr.Interface( | |
| fn=get_system_status, | |
| inputs=None, | |
| outputs=gr.Textbox(label="Status"), | |
| title="System Status", | |
| description="Get system status including IP address, network status, uptime, and memory" | |
| ) | |
| # Interface for ELM327 history | |
| history_interface = gr.Interface( | |
| fn=get_elm327_history, | |
| inputs=gr.Number(label="Count", value=20, precision=0, | |
| info="Number of most recent records to retrieve (default: 20, max: 20)"), | |
| outputs=gr.Textbox(label="History Data"), | |
| title="ELM327 History", | |
| description="Get historical log of OBD-II data (RPM, speed, coolant temp) - Maximum 20 records" | |
| ) | |
| # Combine all interfaces with tabs | |
| demo = gr.TabbedInterface( | |
| [elm327_interface, status_interface, history_interface], | |
| ["ELM327 Commands", "System Status", "History"] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |