Spaces:
Running
Running
| import os | |
| import numpy as np | |
| import cv2 | |
| import csv | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from matplotlib.ticker import MaxNLocator | |
| from export_json import export_json | |
| # Formal MIS palette | |
| C_PRIMARY = "#1e293b" | |
| C_ACCENT = "#334155" | |
| C_IN = "#059669" | |
| C_OUT = "#dc2626" | |
| C_FLOW = "#2563eb" | |
| C_CONG = "#d97706" | |
| C_CONF = "#7c3aed" | |
| C_BAR = "#0f766e" | |
| C_GRID = "#e2e8f0" | |
| C_BG = "#ffffff" | |
| def _style(ax, title, xlabel="", ylabel=""): | |
| ax.set_title(title, fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) | |
| if xlabel: | |
| ax.set_xlabel(xlabel, fontsize=9, fontweight="600", color=C_ACCENT) | |
| if ylabel: | |
| ax.set_ylabel(ylabel, fontsize=9, fontweight="600", color=C_ACCENT) | |
| ax.tick_params(labelsize=8, colors=C_ACCENT) | |
| ax.spines["top"].set_visible(False) | |
| ax.spines["right"].set_visible(False) | |
| ax.spines["left"].set_color(C_GRID) | |
| ax.spines["bottom"].set_color(C_GRID) | |
| ax.yaxis.grid(True, color=C_GRID, linewidth=0.6, alpha=0.8) | |
| ax.set_axisbelow(True) | |
| def _save(fig, path, fmt="png"): | |
| if fmt == "pdf": | |
| path = path.rsplit(".", 1)[0] + ".pdf" | |
| fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") | |
| plt.close(fig) | |
| def direction_pie(total_in, total_out, out_dir, fmt="png"): | |
| if total_in + total_out == 0: | |
| return None | |
| fig, ax = plt.subplots(figsize=(5, 5), facecolor=C_BG) | |
| wedges, texts, autotexts = ax.pie( | |
| [total_in, total_out], | |
| labels=[f"Incoming ({total_in})", f"Outgoing ({total_out})"], | |
| autopct="%1.1f%%", | |
| startangle=90, | |
| colors=[C_IN, C_OUT], | |
| wedgeprops={"edgecolor": C_BG, "linewidth": 2.5 if (total_in > 0 and total_out > 0) else 0}, | |
| textprops={"fontsize": 10, "fontweight": "600", "color": C_PRIMARY}, | |
| ) | |
| for t in autotexts: | |
| t.set_fontsize(11) | |
| t.set_fontweight("700") | |
| t.set_color(C_BG) | |
| ax.set_title("Directional Split", fontsize=13, fontweight="700", color=C_PRIMARY, pad=16) | |
| total = total_in + total_out | |
| ax.text(0, -1.35, f"Total: {total} vehicles", ha="center", fontsize=9, color=C_ACCENT, fontweight="500") | |
| path = os.path.join(out_dir, "direction_pie.png") | |
| _save(fig, path, fmt) | |
| ext = fmt if fmt == "pdf" else "png" | |
| return f"direction_pie.{ext}" | |
| def flow_histogram(flow_times, out_dir, fmt="png"): | |
| if not flow_times: | |
| return None | |
| fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) | |
| bins = min(30, max(5, len(set(flow_times)))) | |
| counts, edges, patches = ax.hist(flow_times, bins=bins, color=C_FLOW, alpha=0.85, edgecolor=C_BG, linewidth=0.8) | |
| ax.yaxis.set_major_locator(MaxNLocator(integer=True)) | |
| _style(ax, "Traffic Flow Over Time", "Time (seconds)", "Vehicles Crossed") | |
| peak_idx = int(np.argmax(counts)) | |
| peak_time = (edges[peak_idx] + edges[peak_idx + 1]) / 2 | |
| ax.text(0.98, 0.95, f"Peak: {int(counts[peak_idx])} vehicles at {peak_time:.1f}s", | |
| transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, | |
| bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) | |
| path = os.path.join(out_dir, "flow_over_time.png") | |
| _save(fig, path, fmt) | |
| ext = fmt if fmt == "pdf" else "png" | |
| return f"flow_over_time.{ext}" | |
| def congestion_chart(congestion, out_dir, fmt="png"): | |
| if not congestion: | |
| return None | |
| fig, ax = plt.subplots(figsize=(10, 4), facecolor=C_BG) | |
| x = range(len(congestion)) | |
| ax.fill_between(x, congestion, alpha=0.08, color=C_CONG) | |
| ax.plot(x, congestion, alpha=0.25, color=C_CONG, linewidth=0.5) | |
| win = min(30, max(3, len(congestion) // 10)) | |
| smooth = np.convolve(congestion, np.ones(win) / win, mode="same") | |
| ax.plot(x, smooth, linewidth=2, color=C_CONG) | |
| ax.yaxis.set_major_locator(MaxNLocator(integer=True)) | |
| _style(ax, "Congestion Index", "Frame", "Active Vehicles") | |
| avg = np.mean(congestion) | |
| peak = max(congestion) | |
| ax.axhline(avg, color=C_ACCENT, linewidth=0.8, linestyle="--", alpha=0.5) | |
| ax.text(0.98, 0.95, f"Peak: {peak} | Avg: {avg:.1f}", | |
| transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, | |
| bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) | |
| path = os.path.join(out_dir, "congestion_index.png") | |
| _save(fig, path, fmt) | |
| ext = fmt if fmt == "pdf" else "png" | |
| return f"congestion_index.{ext}" | |
| def class_dominance(class_in, class_out, model_classes, out_dir, fmt="png"): | |
| totals = {} | |
| for k in set(list(class_in.keys()) + list(class_out.keys())): | |
| totals[k] = class_in.get(k, 0) + class_out.get(k, 0) | |
| if not totals or sum(totals.values()) == 0: | |
| return None | |
| sorted_items = sorted(totals.items(), key=lambda x: x[1], reverse=True) | |
| classes = [model_classes.get(int(i), f"cls_{i}") for i, _ in sorted_items] | |
| values = [v for _, v in sorted_items] | |
| fig, ax = plt.subplots(figsize=(10, 4.5), facecolor=C_BG) | |
| n = len(classes) | |
| bar_width = min(0.45, max(0.15, 0.6 / max(n, 1))) | |
| bars = ax.bar(range(n), values, width=bar_width, color=C_BAR, edgecolor=C_BG, linewidth=0.5, zorder=3) | |
| ax.set_xticks(range(n)) | |
| ax.set_xticklabels(classes, rotation=35, ha="right", fontsize=9, fontweight="500") | |
| ax.yaxis.set_major_locator(MaxNLocator(integer=True)) | |
| for bar, v in zip(bars, values): | |
| ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.15, | |
| str(v), ha="center", va="bottom", fontsize=9, fontweight="700", color=C_PRIMARY) | |
| _style(ax, "Class Dominance", "", "Vehicle Count") | |
| total = sum(values) | |
| ax.text(0.98, 0.95, f"Total: {total} vehicles | {n} classes detected", | |
| transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, | |
| bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) | |
| path = os.path.join(out_dir, "class_dominance.png") | |
| _save(fig, path, fmt) | |
| ext = fmt if fmt == "pdf" else "png" | |
| return f"class_dominance.{ext}" | |
| def confidence_dist(conf_scores, out_dir, fmt="png"): | |
| if not conf_scores: | |
| return None | |
| fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) | |
| ax.hist(conf_scores, bins=30, color=C_CONF, alpha=0.85, edgecolor=C_BG, linewidth=0.8) | |
| ax.yaxis.set_major_locator(MaxNLocator(integer=True)) | |
| _style(ax, "Detection Confidence Distribution", "Confidence Score", "Detections") | |
| mean_c = np.mean(conf_scores) | |
| median_c = np.median(conf_scores) | |
| ax.axvline(mean_c, color=C_PRIMARY, linewidth=1, linestyle="--", alpha=0.6) | |
| ax.text(0.98, 0.95, f"Mean: {mean_c:.3f} | Median: {median_c:.3f} | N={len(conf_scores)}", | |
| transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, | |
| bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) | |
| path = os.path.join(out_dir, "confidence_dist.png") | |
| _save(fig, path, fmt) | |
| ext = fmt if fmt == "pdf" else "png" | |
| return f"confidence_dist.{ext}" | |
| def export_csv(raw_events, out_dir): | |
| if not raw_events or len(raw_events) <= 1: | |
| return None | |
| path = os.path.join(out_dir, "raw_data.csv") | |
| with open(path, mode="w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerows(raw_events) | |
| return "raw_data.csv" | |
| def spatial_heatmap(heatmap_points, video_path, out_dir, fmt="png"): | |
| """ | |
| Confidence-Weighted Spatial Density Map (xAI / Explainability Overlay). | |
| Each detection contributes a Gaussian kernel to the accumulation grid, | |
| weighted by the model's own confidence score for that detection. | |
| This means the heatmap directly encodes WHERE the model is most certain | |
| vehicles exist β making it a faithful spatial explanation of the detector's | |
| attention, without requiring backpropagation. | |
| This is distinct from Grad-CAM (which needs a differentiable classifier) and | |
| is the correct xAI approach for a post-processing YOLO/OpenVINO deployment | |
| where gradients are not available at runtime. | |
| Algorithm: | |
| 1. For each detection (cx, cy, conf): stamp a 2D Gaussian kernel of | |
| radius proportional to frame size, weighted by conf. | |
| 2. Accumulate all weighted kernels into a float32 density grid. | |
| 3. Apply a mild additional Gaussian blur for visual smoothness. | |
| 4. Normalize [0, 255], apply COLORMAP_JET. | |
| 5. Blend over the original frame only where density > threshold. | |
| 6. Annotate with a legend showing the confidence scale. | |
| """ | |
| if not heatmap_points or not video_path or not os.path.exists(video_path): | |
| return None | |
| cap = cv2.VideoCapture(video_path) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return None | |
| h, w = frame.shape[:2] | |
| density = np.zeros((h, w), dtype=np.float32) | |
| # Kernel radius: ~3% of the shorter dimension, min 20px | |
| kernel_r = max(20, int(min(h, w) * 0.03)) | |
| kernel_size = kernel_r * 2 + 1 | |
| # Pre-build a unit Gaussian kernel to stamp for each detection | |
| _kx = cv2.getGaussianKernel(kernel_size, kernel_r / 2.5) | |
| _unit_kernel = (_kx @ _kx.T).astype(np.float32) # shape (ks, ks) | |
| for pt in heatmap_points: | |
| cx, cy = int(pt[0]), int(pt[1]) | |
| # Support both old [cx, cy] and new [cx, cy, conf] formats | |
| conf = float(pt[2]) if len(pt) > 2 else 1.0 | |
| # Kernel bounding box (clip to frame) | |
| x0 = max(0, cx - kernel_r) | |
| y0 = max(0, cy - kernel_r) | |
| x1 = min(w, cx + kernel_r + 1) | |
| y1 = min(h, cy + kernel_r + 1) | |
| # Corresponding slice in the kernel | |
| kx0 = x0 - (cx - kernel_r) | |
| ky0 = y0 - (cy - kernel_r) | |
| kx1 = kx0 + (x1 - x0) | |
| ky1 = ky0 + (y1 - y0) | |
| if x1 > x0 and y1 > y0 and kx1 > kx0 and ky1 > ky0: | |
| density[y0:y1, x0:x1] += conf * _unit_kernel[ky0:ky1, kx0:kx1] | |
| # Mild additional smoothing pass | |
| density = cv2.GaussianBlur(density, (31, 31), 0) | |
| max_val = np.max(density) | |
| if max_val <= 0: | |
| return None | |
| density_norm = (density / max_val * 255.0).astype(np.uint8) | |
| heatmap_color = cv2.applyColorMap(density_norm, cv2.COLORMAP_JET) | |
| # Blend: only paint where density is meaningful (>4% of max) | |
| threshold = int(0.04 * 255) | |
| mask = density_norm > threshold | |
| overlay = frame.copy() | |
| # Smooth alpha blend using the density as alpha weight | |
| alpha_map = (density_norm.astype(np.float32) / 255.0) * 0.72 | |
| alpha_map = np.clip(alpha_map, 0, 0.72) | |
| for c in range(3): | |
| overlay[:, :, c] = np.where( | |
| mask, | |
| (1.0 - alpha_map) * frame[:, :, c] + alpha_map * heatmap_color[:, :, c], | |
| frame[:, :, c] | |
| ).astype(np.uint8) | |
| # ββ xAI legend bar ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Draw a horizontal colorbar with labels in the bottom-left corner | |
| bar_w, bar_h = min(240, w // 4), 14 | |
| bar_x, bar_y = 16, h - bar_h - 36 | |
| gradient = np.tile(np.arange(256, dtype=np.uint8), (bar_h, 1)) | |
| gradient_color = cv2.applyColorMap(gradient, cv2.COLORMAP_JET) # (bar_h, 256, 3) | |
| gradient_resized = cv2.resize(gradient_color, (bar_w, bar_h)) | |
| # Semi-transparent background panel behind the legend | |
| panel_pad = 10 | |
| panel = overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18, | |
| bar_x - panel_pad : bar_x + bar_w + panel_pad] | |
| if panel.size > 0: | |
| dark = np.full_like(panel, 15) | |
| overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18, | |
| bar_x - panel_pad : bar_x + bar_w + panel_pad] = cv2.addWeighted(panel, 0.35, dark, 0.65, 0) | |
| overlay[bar_y : bar_y + bar_h, bar_x : bar_x + bar_w] = gradient_resized | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_s = 0.35 | |
| thickness = 1 | |
| label_color = (220, 220, 220) | |
| cv2.putText(overlay, "Low Confidence", (bar_x, bar_y + bar_h + 14), | |
| font, font_s, label_color, thickness, cv2.LINE_AA) | |
| high_label = "High Confidence" | |
| (tw, _), _ = cv2.getTextSize(high_label, font, font_s, thickness) | |
| cv2.putText(overlay, high_label, (bar_x + bar_w - tw, bar_y + bar_h + 14), | |
| font, font_s, label_color, thickness, cv2.LINE_AA) | |
| # Title label above the bar | |
| title_label = "Detection Confidence Density (xAI)" | |
| (ttw, _), _ = cv2.getTextSize(title_label, font, 0.38, thickness) | |
| cv2.putText(overlay, title_label, | |
| (bar_x, bar_y - panel_pad + 8), | |
| font, 0.38, (180, 180, 180), thickness, cv2.LINE_AA) | |
| # ββ end legend ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if fmt == "pdf": | |
| overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) | |
| fig, ax = plt.subplots(figsize=(12, 7), facecolor=C_BG) | |
| ax.imshow(overlay_rgb) | |
| ax.set_title("Detection Confidence Density Map Β· xAI Spatial Explanation", | |
| fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) | |
| ax.axis('off') | |
| fig.text(0.5, 0.01, | |
| "Brighter regions = higher accumulated detector confidence. " | |
| "Generated from confidence-weighted Gaussian kernel density estimation.", | |
| ha="center", fontsize=7, color=C_ACCENT) | |
| path = os.path.join(out_dir, "heatmap.pdf") | |
| fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") | |
| plt.close(fig) | |
| return "heatmap.pdf" | |
| else: | |
| path = os.path.join(out_dir, "heatmap.png") | |
| cv2.imwrite(path, overlay) | |
| return "heatmap.png" | |
| def generate_all(data, model_classes, out_dir, report_format="png"): | |
| os.makedirs(out_dir, exist_ok=True) | |
| plt.rcParams.update({ | |
| "font.family": "sans-serif", | |
| "font.sans-serif": ["DejaVu Sans", "Arial", "Helvetica"], | |
| "axes.unicode_minus": False, | |
| }) | |
| total_in = sum(data["class_in"].values()) | |
| total_out = sum(data["class_out"].values()) | |
| fmt = report_format | |
| video_path = data.get("video_path") | |
| heatmap_points = data.get("heatmap_points", []) | |
| raw_events = data.get("raw_events", []) | |
| tasks = [ | |
| lambda: direction_pie(total_in, total_out, out_dir, fmt), | |
| lambda: flow_histogram(data.get("flow_times", []), out_dir, fmt), | |
| lambda: congestion_chart(data.get("congestion", []), out_dir, fmt), | |
| lambda: class_dominance(data["class_in"], data["class_out"], model_classes, out_dir, fmt), | |
| lambda: confidence_dist(data.get("conf_scores", []), out_dir, fmt), | |
| lambda: spatial_heatmap(heatmap_points, video_path, out_dir, fmt), | |
| ] | |
| if data.get("export_csv", False): | |
| tasks.append(lambda: export_csv(raw_events, out_dir)) | |
| if data.get("export_json", False): | |
| tasks.append(lambda: export_json( | |
| data, | |
| data.get("video_meta", {}), | |
| data.get("engine_config", {}), | |
| out_dir, | |
| )) | |
| files = [] | |
| for fn in tasks: | |
| name = fn() | |
| if name: | |
| files.append(name) | |
| return files |