castlebbs's picture
Update max count for history
c8b8fda
import gradio as gr
import time
import random
# Vehicle state for dynamic responses
class VehicleState:
def __init__(self):
self.engine_running = True
self.rpm = 800 # Idle RPM
self.speed = 0
self.coolant_temp = 20 # °C
self.throttle_pos = 0
self.maf_rate = 250 # 2.5 g/s idle (scaled by 100)
self.fuel_level = 75 # 75%
self.update_counter = 0
# ELM327 settings
self.echo_on = True
self.linefeed_on = True
self.headers_on = False
self.spaces_on = True
self.protocol = "AUTO"
self.voltage = 13.05
def update(self):
"""Update vehicle state with realistic variations"""
self.update_counter += 1
if not self.engine_running:
return
# RPM varies slightly at idle
self.rpm = 800 + ((self.update_counter * 7) % 100) - 50
# Coolant temp gradually increases when engine running
if self.coolant_temp < 90:
if self.update_counter % 10 == 0:
self.coolant_temp += 1
# Simulate some throttle/speed variation
if self.update_counter % 50 == 0:
self.throttle_pos = (self.update_counter // 5) % 30
self.speed = self.throttle_pos * 2
# MAF follows throttle roughly
self.maf_rate = 250 + (self.throttle_pos * 5)
# Global vehicle state
vehicle_state = VehicleState()
# Pool of possible DTCs for testing
# Format: (code_bytes, description)
DTC_POOL = [
("01 71", "P0171 - System Too Lean (Bank 1)"),
("03 00", "P0300 - Random/Multiple Cylinder Misfire Detected"),
("04 42", "P0442 - EVAP Emission Control System Leak Detected (small leak)"),
("01 31", "P0131 - O2 Sensor Circuit Low Voltage (Bank 1, Sensor 1)"),
("01 33", "P0133 - O2 Sensor Circuit Slow Response (Bank 1, Sensor 1)"),
("01 71", "P0171 - System Too Lean (Bank 1)"),
("01 72", "P0172 - System Too Rich (Bank 1)"),
("02 02", "P0202 - Injector Circuit Malfunction - Cylinder 2"),
("03 20", "P0320 - Ignition/Distributor Engine Speed Input Circuit Malfunction"),
("04 20", "P0420 - Catalyst System Efficiency Below Threshold (Bank 1)"),
("05 00", "P0500 - Vehicle Speed Sensor Malfunction"),
("07 05", "P0705 - Transmission Range Sensor Circuit Malfunction (PRNDL Input)"),
("13 00", "P1300 - Igniter Circuit Malfunction"),
("00 16", "P0016 - Crankshaft Position/Camshaft Position Correlation (Bank 1)"),
("01 28", "P0128 - Coolant Thermostat (Coolant Temp Below Thermostat Regulating Temp)"),
]
# Mode 01 PID table (static responses)
MODE01_PID_TABLE = {
"00": "41 00 BE 3F B8 13", # Supported PIDs 01-20
"01": "41 01 83 07 65 04", # Monitor status (MIL on, 3 DTCs)
"03": "41 03 02 00", # Fuel system status (closed loop)
"06": "41 06 80", # Short term fuel trim
"07": "41 07 80", # Long term fuel trim
"0A": "41 0A B4", # Fuel pressure (540 kPa)
"0B": "41 0B 63", # Intake manifold pressure (99 kPa)
"0E": "41 0E 7C", # Timing advance (14 degrees)
"0F": "41 0F 54", # Intake air temperature (44°C)
"13": "41 13 03", # O2 sensors present
"1C": "41 1C 01", # OBD standard (OBD-II California ARB)
"1F": "41 1F 00 8C", # Run time since engine start (140s)
"20": "41 20 80 01 80 01", # Supported PIDs 21-40
"21": "41 21 00 4B", # Distance with MIL on (75 km)
"33": "41 33 65", # Barometric pressure (101 kPa)
"40": "41 40 40 00 00 00", # Supported PIDs 41-60
"42": "41 42 32 E8", # Control module voltage (13.05V)
"51": "41 51 01", # Fuel Type (Gasoline)
}
# Dynamic PIDs (generated from vehicle state)
DYNAMIC_PIDS = ["04", "05", "0C", "0D", "10", "11", "2F"]
def normalize_command(cmd):
"""Normalize command string (remove spaces, convert to uppercase)"""
return ''.join(cmd.split()).upper()
def format_response(response, add_prompt=True):
"""Format response with ELM327 settings (echo, spaces, prompt)"""
result = ""
if vehicle_state.echo_on:
result += response + "\n"
if vehicle_state.linefeed_on:
result += "\n"
if add_prompt:
result += ">"
return result
def generate_rpm_response():
"""Generate dynamic RPM response (PID 0C)"""
rpm_value = vehicle_state.rpm
encoded = rpm_value * 4
return f"41 0C {(encoded >> 8) & 0xFF:02X} {encoded & 0xFF:02X}"
def generate_speed_response():
"""Generate dynamic speed response (PID 0D)"""
return f"41 0D {vehicle_state.speed:02X}"
def generate_coolant_temp_response():
"""Generate dynamic coolant temperature response (PID 05)"""
encoded = vehicle_state.coolant_temp + 40
return f"41 05 {encoded:02X}"
def generate_engine_load_response():
"""Generate dynamic engine load response (PID 04)"""
if vehicle_state.engine_running:
load = (vehicle_state.throttle_pos * 2 + vehicle_state.rpm // 100) // 3
load = min(load, 100)
encoded = (load * 255) // 100
else:
encoded = 0
return f"41 04 {encoded:02X}"
def generate_maf_response():
"""Generate dynamic MAF response (PID 10)"""
maf = vehicle_state.maf_rate
return f"41 10 {(maf >> 8) & 0xFF:02X} {maf & 0xFF:02X}"
def generate_throttle_response():
"""Generate dynamic throttle position response (PID 11)"""
encoded = (vehicle_state.throttle_pos * 255) // 100
return f"41 11 {encoded:02X}"
def generate_fuel_level_response():
"""Generate dynamic fuel level response (PID 2F)"""
encoded = (vehicle_state.fuel_level * 255) // 100
return f"41 2F {encoded:02X}"
def handle_mode01_pid(pid):
"""Handle Mode 01 PID requests"""
# Check if it's a dynamic PID
if pid in DYNAMIC_PIDS:
if pid == "0C":
return generate_rpm_response()
elif pid == "0D":
return generate_speed_response()
elif pid == "05":
return generate_coolant_temp_response()
elif pid == "04":
return generate_engine_load_response()
elif pid == "10":
return generate_maf_response()
elif pid == "11":
return generate_throttle_response()
elif pid == "2F":
return generate_fuel_level_response()
# Check static PID table
if pid in MODE01_PID_TABLE:
return MODE01_PID_TABLE[pid]
return "NO DATA"
def generate_vin_response():
"""Generate Mode 09 PID 02 response (VIN)"""
vin = "5TDKRKEC7PS142916"
response = "49 02 01"
for char in vin:
response += f" {ord(char):02X}"
return response
def generate_calibration_id_response():
"""Generate Mode 09 PID 04 response (Calibration ID)"""
cal_id = "CAL123456"
response = "49 04 01"
for char in cal_id:
response += f" {ord(char):02X}"
return response
def generate_ecu_name_response():
"""Generate Mode 09 PID 0A response (ECU Name)"""
ecu_name = "ECU_SIM_UNIT"
response = "49 0A 01"
for char in ecu_name:
response += f" {ord(char):02X}"
return response
def handle_at_command(cmd):
"""Handle AT commands"""
global vehicle_state
cmd_upper = cmd.upper()
# ATZ - Reset
if cmd_upper == "ATZ":
vehicle_state = VehicleState()
return "ELM327 v1.5\n\n>"
# AT@1 - Device description
if cmd_upper == "AT@1":
return "OBDSIM ELM327\n\n>"
# ATI - Version ID
if cmd_upper == "ATI":
return "ELM327 v1.5\n\n>"
# ATE0 - Echo off
if cmd_upper == "ATE0":
vehicle_state.echo_on = False
return "OK\n\n>"
# ATE1 - Echo on
if cmd_upper == "ATE1":
vehicle_state.echo_on = True
return "OK\n\n>"
# ATL0 - Linefeed off
if cmd_upper == "ATL0":
vehicle_state.linefeed_on = False
return "OK\n\n>"
# ATL1 - Linefeed on
if cmd_upper == "ATL1":
vehicle_state.linefeed_on = True
return "OK\n\n>"
# ATH0 - Headers off
if cmd_upper == "ATH0":
vehicle_state.headers_on = False
return "OK\n\n>"
# ATH1 - Headers on
if cmd_upper == "ATH1":
vehicle_state.headers_on = True
return "OK\n\n>"
# ATS0 - Spaces off
if cmd_upper == "ATS0":
vehicle_state.spaces_on = False
return "OK\n\n>"
# ATS1 - Spaces on
if cmd_upper == "ATS1":
vehicle_state.spaces_on = True
return "OK\n\n>"
# ATSP - Set Protocol
if cmd_upper.startswith("ATSP"):
protocol_num = cmd_upper[4:] if len(cmd_upper) > 4 else "0"
protocols = {
"0": "AUTO",
"1": "SAE J1850 PWM",
"2": "SAE J1850 VPW",
"3": "ISO 9141-2",
"4": "ISO 14230-4 KWP",
"5": "ISO 14230-4 KWP (fast)",
"6": "ISO 15765-4 CAN (11 bit, 500 kbaud)",
"7": "ISO 15765-4 CAN (29 bit, 500 kbaud)",
"8": "ISO 15765-4 CAN (11 bit, 250 kbaud)",
"9": "ISO 15765-4 CAN (29 bit, 250 kbaud)",
"A": "SAE J1939 CAN",
}
vehicle_state.protocol = protocols.get(protocol_num, "AUTO")
return "OK\n\n>"
# ATDP - Describe Protocol
if cmd_upper == "ATDP":
return f"{vehicle_state.protocol}\n\n>"
# ATDPN - Describe Protocol by Number
if cmd_upper == "ATDPN":
return "6\n\n>" # ISO 15765-4 CAN (11 bit, 500 kbaud)
# ATRV - Read Voltage
if cmd_upper == "ATRV":
return f"{vehicle_state.voltage:.1f}V\n\n>"
# ATWS - Warm Start
if cmd_upper == "ATWS":
return "ELM327 v1.5\n\n>"
# ATD - Set to Defaults
if cmd_upper == "ATD":
vehicle_state = VehicleState()
return "OK\n\n>"
# ATAT - Adaptive Timing
if cmd_upper.startswith("ATAT"):
return "OK\n\n>"
# ATST - Set Timeout
if cmd_upper.startswith("ATST"):
return "OK\n\n>"
# ATMA - Monitor All
if cmd_upper == "ATMA":
return "MONITORING...\n(Press any key to stop)\n\n>"
# ATPC - Protocol Close
if cmd_upper == "ATPC":
return "OK\n\n>"
# Default response for unknown AT commands
return "?\n\n>"
def handle_obd_command(normalized):
"""Handle OBD-II commands"""
# Update vehicle state
vehicle_state.update()
# Mode 03 - Read DTCs
if normalized == "03":
# Randomly select 1-5 DTCs from the pool
num_dtcs = random.randint(1, 5)
selected_dtcs = random.sample(DTC_POOL, min(num_dtcs, len(DTC_POOL)))
# Build response: 43 [DTC codes...]
response = "43"
for dtc_code, _ in selected_dtcs:
response += " " + dtc_code
return response + "\n\n>"
# Mode 07 - Read pending DTCs
if normalized == "07":
# Randomly select 0-2 pending DTCs from the pool
num_pending = random.randint(0, 2)
if num_pending == 0:
# No pending DTCs
response = "47"
else:
selected_pending = random.sample(DTC_POOL, min(num_pending, len(DTC_POOL)))
response = "47"
for dtc_code, _ in selected_pending:
response += " " + dtc_code
return response + "\n\n>"
# Mode 04 - Clear DTCs
if normalized == "04":
return "44\n\n>"
# Mode 09 - Vehicle Information
if len(normalized) >= 4 and normalized[:2] == "09":
mode09_pid = normalized[2:4]
if mode09_pid == "00":
# Supported PIDs
response = "49 00 54 40 00 00"
elif mode09_pid == "02":
# VIN
response = generate_vin_response()
elif mode09_pid == "04":
# Calibration ID
response = generate_calibration_id_response()
elif mode09_pid == "0A":
# ECU Name
response = generate_ecu_name_response()
else:
response = "NO DATA"
return response + "\n\n>"
# Mode 01 - Show current data
if len(normalized) >= 4 and normalized[:2] == "01":
pid = normalized[2:4]
response = handle_mode01_pid(pid)
return response + "\n\n>"
return "NO DATA\n\n>"
def send_elm327_command(command):
"""
Send a command to the ELM327 OBD-II adapter and get the response.
Args:
command (str): ELM327 command to send (e.g., 'ATZ', '01 0D')
Returns:
str: The response from the ELM327 adapter
"""
if not command or not command.strip():
return ">"
# Normalize command
normalized = normalize_command(command.strip())
# Handle AT commands
if normalized.startswith("AT"):
return handle_at_command(normalized)
# Handle OBD commands
return handle_obd_command(normalized)
def get_system_status():
"""
Get system status including IP address, network status, uptime, and memory.
Returns:
str: System status information
"""
import json
import socket
import psutil
import platform
# Get IP address
try:
# Get the default route interface IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
except Exception:
ip_address = "127.0.0.1"
# Get uptime in seconds
uptime_seconds = int(time.time() - psutil.boot_time())
# Get free memory in bytes
memory_info = psutil.virtual_memory()
free_memory_bytes = memory_info.available
# Simulate WiFi RSSI (for simulation, use a random value between -40 and -70 dBm)
# In real hardware this would come from the WiFi driver
import random
wifi_rssi_dbm = random.randint(-70, -40)
# Build JSON response matching W600 MCP format
status_data = {
"ip_address": ip_address,
"uptime_seconds": uptime_seconds,
"free_memory_bytes": free_memory_bytes,
"wifi_rssi_dbm": wifi_rssi_dbm,
"elm327_status": "Simulated ELM327 on Gradio"
}
# Return formatted JSON string
return json.dumps(status_data, indent=2)
def get_elm327_history(count):
"""
Get historical log of OBD-II data (RPM, speed, coolant temp) with streaming support.
Returns the last N records.
Args:
count (int): Number of most recent records to retrieve (default: 20, max: 20)
Returns:
str: Historical OBD-II data or error message
"""
import json
if count is None or count == "":
count = 20
else:
count = int(count)
# Enforce maximum limit of 20 records
if count > 20:
return json.dumps({
"error": "Request exceeds maximum limit",
"message": f"Requested {count} records, but maximum allowed is 20 records. Please request 20 or fewer records.",
"requested": count,
"max_allowed": 20
}, indent=2)
# Generate simulated test data
# Simulate a scenario where the vehicle has been running for a while
history_records = []
# Base timestamp in seconds (simulating uptime)
base_time = 0 # Start from 0 seconds
for i in range(count):
# Simulate realistic driving conditions with more variations
import math
# RPM varies between idle (800) and higher driving (up to 4500)
# Create multiple variation patterns for more realistic data
cycle_position = (i % 50) / 50.0
sine_variation = math.sin(i * 0.1) * 400 # Sine wave variation
random_noise = ((i * 7) % 300) - 150 # Random-like noise
base_rpm = 800 + 2700 * cycle_position
rpm = int(base_rpm + sine_variation + random_noise)
rpm = max(700, min(rpm, 5000)) # Clamp to realistic range
# Speed follows RPM with more variation (0-120 km/h)
# Add acceleration/deceleration patterns
speed_factor = (rpm - 800) / 35 # Convert RPM to rough speed
speed_variation = math.cos(i * 0.15) * 15 # Speed variations
speed = int(speed_factor + speed_variation + ((i * 3) % 20) - 10)
speed = max(0, min(speed, 130)) # Clamp to 0-130 km/h
# Coolant temp starts cold and warms up, then stabilizes
if i < 20:
coolant_temp = 20 + i * 3 # Warming up
elif i < 40:
coolant_temp = 60 + (i - 20) * 1 # Still warming
else:
coolant_temp = 80 + ((i * 5) % 15) # Stable operating temp with variation
coolant_temp = min(coolant_temp, 95) # Cap at 95°C
# Create record matching W600 MCP format
# Time in seconds (2 second intervals)
record = {
"seq": i,
"time": base_time + (i * 2), # Time in seconds, 2 seconds between samples
"rpm": rpm,
"speed": speed,
"coolant_temp": coolant_temp
}
history_records.append(record)
# Format as JSON array (matching MCP format)
result = json.dumps(history_records)
return result
# Interface for ELM327 commands
elm327_interface = gr.Interface(
fn=send_elm327_command,
inputs=gr.Textbox(label="Command", placeholder="e.g., ATZ, 01 0D"),
outputs=gr.Textbox(label="Response"),
title="ELM327 Commands",
description="Send commands to the ELM327 OBD-II adapter"
)
# Interface for system status
status_interface = gr.Interface(
fn=get_system_status,
inputs=None,
outputs=gr.Textbox(label="Status"),
title="System Status",
description="Get system status including IP address, network status, uptime, and memory"
)
# Interface for ELM327 history
history_interface = gr.Interface(
fn=get_elm327_history,
inputs=gr.Number(label="Count", value=20, precision=0,
info="Number of most recent records to retrieve (default: 20, max: 20)"),
outputs=gr.Textbox(label="History Data"),
title="ELM327 History",
description="Get historical log of OBD-II data (RPM, speed, coolant temp) - Maximum 20 records"
)
# Combine all interfaces with tabs
demo = gr.TabbedInterface(
[elm327_interface, status_interface, history_interface],
["ELM327 Commands", "System Status", "History"]
)
if __name__ == "__main__":
demo.launch(mcp_server=True)