Spaces:
Sleeping
Sleeping
| """ | |
| EV Camper Dashboard - FastAPI Backend | |
| """ | |
| import csv | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse, JSONResponse | |
| app = FastAPI(title="EV Camper Dashboard API", version="1.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| DATA_DIR = Path("output") | |
| CONFIG_PATH = Path("data.json") | |
| POWER_RESOLUTIONS = {"1SEC", "1MIN", "15MIN", "1H", "1DAY"} | |
| WATER_RESOLUTIONS = {"1MIN", "15MIN", "1H", "1DAY"} | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def read_csv(path: Path, limit: int = None, offset: int = 0) -> list[dict]: | |
| if not path.exists(): | |
| return [] | |
| with open(path) as f: | |
| reader = csv.DictReader(f) | |
| rows = [] | |
| for i, row in enumerate(reader): | |
| if i < offset: | |
| continue | |
| parsed = {} | |
| for k, v in row.items(): | |
| if k == "Time": | |
| parsed[k] = v | |
| continue | |
| try: | |
| parsed[k] = float(v) | |
| except (ValueError, TypeError): | |
| parsed[k] = v | |
| rows.append(parsed) | |
| if limit and len(rows) >= limit: | |
| break | |
| return rows | |
| def load_config() -> dict: | |
| with open(CONFIG_PATH) as f: | |
| return json.load(f) | |
| def get_csv_headers(path: Path) -> list[str]: | |
| """Return column names from first line of CSV.""" | |
| if not path.exists(): | |
| return [] | |
| with open(path) as f: | |
| return next(csv.reader(f), []) | |
| def infer_type(col: str) -> str: | |
| """Infer JSON schema type from column name.""" | |
| if col == "Time": | |
| return "string (ISO8601)" | |
| if "Pct" in col or "Level" in col or "Ah" in col or "Voltage" in col or "Flow" in col or "Total" in col or "kWh" in col or "Lpm" in col or "_L" in col: | |
| return "number" | |
| return "string" | |
| # ββ Schema & Data Tables APIs βββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_schema_and_mermaid() -> tuple[dict, str]: | |
| """Build schema dict and Mermaid ER diagram from actual CSV files.""" | |
| tables = {} | |
| # Power: sample from 15MIN (flow) and 1H (totals) to cover both column sets | |
| for res in POWER_RESOLUTIONS: | |
| path = DATA_DIR / "power" / f"{res}.csv" | |
| if path.exists(): | |
| headers = get_csv_headers(path) | |
| if headers and (res not in tables or len(headers) > len(tables.get(res, {}).get("columns", []))): | |
| tables[f"power_{res}"] = { | |
| "stream": "power", | |
| "resolution": res, | |
| "columns": [{"name": h, "type": infer_type(h)} for h in headers], | |
| } | |
| # Water | |
| for res in WATER_RESOLUTIONS: | |
| path = DATA_DIR / "water" / f"{res}.csv" | |
| if path.exists(): | |
| headers = get_csv_headers(path) | |
| if headers: | |
| tables[f"water_{res}"] = { | |
| "stream": "water", | |
| "resolution": res, | |
| "columns": [{"name": h, "type": infer_type(h)} for h in headers], | |
| } | |
| # Mermaid ER diagram: one entity per logical table (power_series, water_series) | |
| power_cols = set() | |
| water_cols = set() | |
| for key, meta in tables.items(): | |
| for c in meta["columns"]: | |
| if meta["stream"] == "power": | |
| power_cols.add((c["name"], c["type"])) | |
| else: | |
| water_cols.add((c["name"], c["type"])) | |
| power_cols = sorted(power_cols, key=lambda x: (0 if x[0] == "Time" else 1, x[0])) | |
| water_cols = sorted(water_cols, key=lambda x: (0 if x[0] == "Time" else 1, x[0])) | |
| def mermaid_type(t: str) -> str: | |
| if "string" in t or "ISO" in t: | |
| return "string" | |
| return "decimal" | |
| def mermaid_row(name: str, t: str) -> str: | |
| attr = name.replace(" ", "_") | |
| typ = mermaid_type(t) | |
| return f" {typ} {attr}" | |
| lines = [ | |
| "erDiagram", | |
| " power_timeseries {", | |
| *[mermaid_row(c[0], c[1]) for c in power_cols], | |
| " }", | |
| " water_timeseries {", | |
| *[mermaid_row(c[0], c[1]) for c in water_cols], | |
| " }", | |
| " power_timeseries ||--o| water_timeseries : aligned_by_Time", | |
| ] | |
| mermaid = "\n".join(lines) | |
| return {"tables": tables}, mermaid | |
| def get_schema(): | |
| """Return CSV data schema (tables, columns, types) and Mermaid ER diagram.""" | |
| schema_dict, mermaid = _build_schema_and_mermaid() | |
| return { | |
| **schema_dict, | |
| "mermaid": mermaid, | |
| } | |
| def get_data_tables_list(): | |
| """Return list of available data tables (stream, resolution, columns, row_count).""" | |
| result = [] | |
| for res in POWER_RESOLUTIONS: | |
| path = DATA_DIR / "power" / f"{res}.csv" | |
| if path.exists(): | |
| headers = get_csv_headers(path) | |
| with open(path) as f: | |
| row_count = sum(1 for _ in f) - 1 # exclude header | |
| result.append({ | |
| "id": f"power_{res}", | |
| "stream": "power", | |
| "resolution": res, | |
| "columns": headers, | |
| "row_count": max(0, row_count), | |
| }) | |
| for res in WATER_RESOLUTIONS: | |
| path = DATA_DIR / "water" / f"{res}.csv" | |
| if path.exists(): | |
| headers = get_csv_headers(path) | |
| with open(path) as f: | |
| row_count = sum(1 for _ in f) - 1 | |
| result.append({ | |
| "id": f"water_{res}", | |
| "stream": "water", | |
| "resolution": res, | |
| "columns": headers, | |
| "row_count": max(0, row_count), | |
| }) | |
| return {"tables": result} | |
| # ββ Config & Specs βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_config(): | |
| """Return trip configuration and trailer specs.""" | |
| cfg = load_config() | |
| return { | |
| "inputs": cfg["inputs"], | |
| "trailer_specs": cfg["trailer_specs"], | |
| } | |
| def get_components(): | |
| """Return all trailer components by category.""" | |
| cfg = load_config() | |
| comps = cfg["lookups"]["components"] | |
| schema = comps["schema"] | |
| result = {} | |
| for category in ["living", "cooking", "toilet", "shower", "laundry", "actuation", "dumping", "hvac"]: | |
| if category in comps: | |
| result[category] = [ | |
| dict(zip(schema, item)) for item in comps[category] | |
| ] | |
| return result | |
| # ββ Budget Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_power_budget(): | |
| """Daily power budget breakdown in kWh.""" | |
| cfg = load_config() | |
| from generate_mock_data import calc_power_budget | |
| params = cfg["inputs"]["params"] | |
| return calc_power_budget( | |
| cfg, | |
| params["user_type"]["value"], | |
| params["num_people"]["value"], | |
| params["temperature"]["value"], | |
| params["hvac_runtime_hrs"]["value"], | |
| ) | |
| def get_water_budget(): | |
| """Daily water budget breakdown in litres.""" | |
| cfg = load_config() | |
| from generate_mock_data import calc_water_budget | |
| params = cfg["inputs"]["params"] | |
| return calc_water_budget( | |
| cfg, | |
| params["user_type"]["value"], | |
| params["num_people"]["value"], | |
| ) | |
| # ββ Time Series Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ Hourly Profile (MUST be before /{resolution} routes) ββββββββββββββββ | |
| def get_hourly_profile(): | |
| """Average hourly power profile across all trip days.""" | |
| hourly = read_csv(DATA_DIR / "power" / "1H.csv") | |
| circuits = ["HVAC", "Lighting", "Devices", "Fridge", "WaterPump", "Cooking", "Inverter"] | |
| from collections import defaultdict | |
| from datetime import datetime as dt | |
| hour_buckets = defaultdict(lambda: defaultdict(list)) | |
| for row in hourly: | |
| ts = dt.fromisoformat(row["Time"].replace("Z", "")) | |
| h = ts.hour | |
| hour_buckets[h]["solar"].append(row.get("Solar_Total_kWh", 0)) | |
| for c in circuits: | |
| hour_buckets[h][c].append(row.get(f"{c}_Total_kWh", 0)) | |
| hour_buckets[h]["battery_pct"].append(row.get("Battery_Level_Pct", 0)) | |
| profile = [] | |
| for h in range(24): | |
| entry = {"hour": h} | |
| if h in hour_buckets: | |
| b = hour_buckets[h] | |
| entry["solar_kwh"] = round(sum(b["solar"]) / len(b["solar"]), 4) | |
| entry["battery_pct"] = round(sum(b["battery_pct"]) / len(b["battery_pct"]), 1) | |
| for c in circuits: | |
| entry[f"{c}_kwh"] = round(sum(b[c]) / len(b[c]), 4) | |
| entry["total_load_kwh"] = round(sum(entry.get(f"{c}_kwh", 0) for c in circuits), 4) | |
| profile.append(entry) | |
| return profile | |
| def get_water_hourly_profile(): | |
| """Average hourly water usage profile.""" | |
| hourly = read_csv(DATA_DIR / "water" / "1H.csv") | |
| circuits = ["Shower", "Kitchen", "Toilet"] | |
| from collections import defaultdict | |
| from datetime import datetime as dt | |
| hour_buckets = defaultdict(lambda: defaultdict(list)) | |
| for row in hourly: | |
| ts = dt.fromisoformat(row["Time"].replace("Z", "")) | |
| h = ts.hour | |
| for c in circuits: | |
| hour_buckets[h][c].append(row.get(f"{c}_Total_L", 0)) | |
| hour_buckets[h]["fresh_pct"].append( | |
| row.get("FreshTank_Level_Pct", row.get("FreshTank_Level_L", 0) / 378.5 * 100) | |
| ) | |
| profile = [] | |
| for h in range(24): | |
| entry = {"hour": h} | |
| if h in hour_buckets: | |
| b = hour_buckets[h] | |
| for c in circuits: | |
| entry[f"{c}_L"] = round(sum(b[c]) / len(b[c]), 3) | |
| entry["total_L"] = round(sum(entry.get(f"{c}_L", 0) for c in circuits), 3) | |
| entry["fresh_pct"] = round(sum(b["fresh_pct"]) / len(b["fresh_pct"]), 1) | |
| profile.append(entry) | |
| return profile | |
| def get_power_peaks(): | |
| """Identify peak power usage hours across the trip.""" | |
| hourly = read_csv(DATA_DIR / "power" / "1H.csv") | |
| circuits = ["HVAC", "Lighting", "Devices", "Fridge", "WaterPump", "Cooking", "Inverter"] | |
| peaks = [] | |
| for row in hourly: | |
| total = sum(row.get(f"{c}_Total_kWh", 0) for c in circuits) | |
| peaks.append({ | |
| "time": row["Time"], | |
| "total_kwh": round(total, 4), | |
| "solar_kwh": row.get("Solar_Total_kWh", 0), | |
| "battery_pct": row.get("Battery_Level_Pct", 0), | |
| }) | |
| peaks.sort(key=lambda x: x["total_kwh"], reverse=True) | |
| return { | |
| "top_10_peak_hours": peaks[:10], | |
| "bottom_10_hours": sorted(peaks, key=lambda x: x["total_kwh"])[:10], | |
| } | |
| def get_power_data( | |
| resolution: str, | |
| limit: Optional[int] = Query(None, ge=1, le=50000), | |
| offset: int = Query(0, ge=0), | |
| day: Optional[int] = Query(None, ge=1, le=30, description="Filter by day number"), | |
| ): | |
| """Power time-series data at given resolution.""" | |
| res = resolution.upper() | |
| if res not in POWER_RESOLUTIONS: | |
| return JSONResponse({"error": f"Invalid resolution. Use: {POWER_RESOLUTIONS}"}, 400) | |
| path = DATA_DIR / "power" / f"{res}.csv" | |
| # When day filter is active, read all rows first, then filter | |
| if day is not None: | |
| rows = read_csv(path) | |
| if rows: | |
| from datetime import datetime | |
| start_date = datetime.fromisoformat(rows[0]["Time"].replace("Z", "")).date() | |
| rows = [r for r in rows | |
| if (datetime.fromisoformat(r["Time"].replace("Z", "")).date() - start_date).days + 1 == day] | |
| if limit: | |
| rows = rows[offset:offset + limit] | |
| else: | |
| rows = read_csv(path, limit=limit, offset=offset) | |
| return {"resolution": res, "count": len(rows), "data": rows} | |
| def get_water_data( | |
| resolution: str, | |
| limit: Optional[int] = Query(None, ge=1, le=50000), | |
| offset: int = Query(0, ge=0), | |
| day: Optional[int] = Query(None, ge=1, le=30), | |
| ): | |
| """Water time-series data at given resolution.""" | |
| res = resolution.upper() | |
| if res not in WATER_RESOLUTIONS: | |
| return JSONResponse({"error": f"Invalid resolution. Use: {WATER_RESOLUTIONS}"}, 400) | |
| path = DATA_DIR / "water" / f"{res}.csv" | |
| if day is not None: | |
| rows = read_csv(path) | |
| if rows: | |
| from datetime import datetime | |
| start_date = datetime.fromisoformat(rows[0]["Time"].replace("Z", "")).date() | |
| rows = [r for r in rows | |
| if (datetime.fromisoformat(r["Time"].replace("Z", "")).date() - start_date).days + 1 == day] | |
| if limit: | |
| rows = rows[offset:offset + limit] | |
| else: | |
| rows = read_csv(path, limit=limit, offset=offset) | |
| return {"resolution": res, "count": len(rows), "data": rows} | |
| # ββ Summary / Aggregated Stats βββββββββββββββββββββββββββββββββββββββββββ | |
| def get_summary(): | |
| """Trip-wide summary statistics.""" | |
| daily_power = read_csv(DATA_DIR / "power" / "1DAY.csv") | |
| daily_water = read_csv(DATA_DIR / "water" / "1DAY.csv") | |
| hourly_power = read_csv(DATA_DIR / "power" / "1H.csv") | |
| if not daily_power: | |
| return {"error": "No data"} | |
| circuits = ["HVAC", "Lighting", "Devices", "Fridge", "WaterPump", "Cooking", "Inverter", "Unmetered"] | |
| total_solar = sum(d.get("Solar_Total_kWh", 0) for d in daily_power) | |
| total_shore = sum(d.get("Shore_Total_kWh", 0) for d in daily_power) | |
| total_consumption = sum( | |
| sum(d.get(f"{c}_Total_kWh", 0) for c in circuits) | |
| for d in daily_power | |
| ) | |
| # Per-circuit totals | |
| circuit_totals = {} | |
| for c in circuits: | |
| circuit_totals[c] = round(sum(d.get(f"{c}_Total_kWh", 0) for d in daily_power), 3) | |
| # Battery stats | |
| min_battery = min(d.get("Battery_Level_Pct", 100) for d in hourly_power) if hourly_power else 0 | |
| max_battery = max(d.get("Battery_Level_Pct", 0) for d in hourly_power) if hourly_power else 100 | |
| avg_battery = sum(d.get("Battery_Level_Pct", 0) for d in hourly_power) / len(hourly_power) if hourly_power else 0 | |
| # Water totals | |
| total_fresh_used = 0 | |
| if daily_water: | |
| total_fresh_used = sum(d.get("Pump_Total_L", 0) for d in daily_water) | |
| # Final tank levels | |
| last_water = daily_water[-1] if daily_water else {} | |
| last_power = hourly_power[-1] if hourly_power else {} | |
| # Daily breakdown | |
| daily_breakdown = [] | |
| for i, dp in enumerate(daily_power): | |
| dw = daily_water[i] if i < len(daily_water) else {} | |
| daily_breakdown.append({ | |
| "day": i + 1, | |
| "solar_kwh": round(dp.get("Solar_Total_kWh", 0), 2), | |
| "consumption_kwh": round(sum(dp.get(f"{c}_Total_kWh", 0) for c in circuits), 2), | |
| "shore_kwh": round(dp.get("Shore_Total_kWh", 0), 2), | |
| "battery_end_pct": dp.get("Battery_Level_Pct", 0), | |
| "fresh_used_L": round(dw.get("Pump_Total_L", 0), 1), | |
| "fresh_remaining_pct": round(dw.get("FreshTank_Level_Pct", 0), 1), | |
| }) | |
| return { | |
| "trip_days": len(daily_power), | |
| "total_solar_kwh": round(total_solar, 2), | |
| "total_shore_kwh": round(total_shore, 2), | |
| "total_consumption_kwh": round(total_consumption, 2), | |
| "self_sufficiency_pct": round(total_solar / max(total_consumption, 0.01) * 100, 1), | |
| "circuit_totals_kwh": circuit_totals, | |
| "battery": { | |
| "min_pct": round(min_battery, 1), | |
| "max_pct": round(max_battery, 1), | |
| "avg_pct": round(avg_battery, 1), | |
| "current_pct": last_power.get("Battery_Level_Pct", 0), | |
| }, | |
| "water": { | |
| "total_fresh_used_L": round(total_fresh_used, 1), | |
| "fresh_remaining_L": last_water.get("FreshTank_Level_L", 0), | |
| "fresh_remaining_pct": last_water.get("FreshTank_Level_Pct", 0), | |
| "grey_level_L": last_water.get("GreyTank_Level_L", 0), | |
| "grey_level_pct": last_water.get("GreyTank_Level_Pct", 0), | |
| "black_level_L": last_water.get("BlackTank_Level_L", 0), | |
| "black_level_pct": last_water.get("BlackTank_Level_Pct", 0), | |
| }, | |
| "daily_breakdown": daily_breakdown, | |
| } | |
| # ββ Serve Frontend βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ Data Regeneration ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from pydantic import BaseModel | |
| class GenerateRequest(BaseModel): | |
| user_type: str = "Typical" | |
| num_people: int = 2 | |
| trip_duration_days: int = 5 | |
| temperature: str = "Hot" | |
| sunlight: str = "Hi- Sunny" | |
| humidity: str = "Comfortable" | |
| seed: int = 42 | |
| def regenerate_data(req: GenerateRequest): | |
| """Regenerate mock data with new parameters.""" | |
| import subprocess, sys | |
| # Update config in memory for budget endpoints | |
| cfg = load_config() | |
| cfg["inputs"]["params"]["user_type"]["value"] = req.user_type | |
| cfg["inputs"]["params"]["num_people"]["value"] = req.num_people | |
| cfg["inputs"]["params"]["trip_duration_days"]["value"] = req.trip_duration_days | |
| cfg["inputs"]["params"]["temperature"]["value"] = req.temperature | |
| cfg["inputs"]["params"]["sunlight"]["value"] = req.sunlight | |
| cfg["inputs"]["params"]["humidity"]["value"] = req.humidity | |
| # Write updated config | |
| with open(CONFIG_PATH, "w") as f: | |
| json.dump(cfg, f, indent=2) | |
| # Run the generator | |
| result = subprocess.run( | |
| [sys.executable, "generate_mock_data.py", | |
| "--config", str(CONFIG_PATH), | |
| "--out", str(DATA_DIR), | |
| "--seed", str(req.seed), | |
| "--user", req.user_type, | |
| "--people", str(req.num_people), | |
| "--days", str(req.trip_duration_days), | |
| "--temp", req.temperature], | |
| capture_output=True, text=True, timeout=120, | |
| ) | |
| if result.returncode != 0: | |
| return JSONResponse({"error": result.stderr}, 500) | |
| return {"status": "ok", "message": f"Generated {req.trip_duration_days}-day trip for {req.user_type} profile"} | |
| # ββ Serve Frontend (catch-all, must be LAST) βββββββββββββββββββββββββββββ | |
| STATIC_DIR = Path("static") | |
| def serve_index(): | |
| return FileResponse(STATIC_DIR / "index.html") | |
| def serve_static(path: str): | |
| file_path = STATIC_DIR / path | |
| if file_path.exists() and file_path.is_file(): | |
| return FileResponse(file_path) | |
| return FileResponse(STATIC_DIR / "index.html") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |