import os import cv2 import numpy as np import onnxruntime as ort import tempfile import matplotlib.pyplot as plt from collections import Counter, defaultdict import gradio as gr import time from typing import List, Dict, Tuple, Optional # ------------------- # Model Initialization # ------------------- MODEL_PATH = "best2.onnx" SEVERITY_PATH = "severity_model.onnx" PROVIDERS = ["CPUExecutionProvider"] # Initialize sessions (will be loaded when needed) yolo_sess = None severity_sess = None SEVERITY_LABELS = ["Mild", "Moderate", "Severe"] REPAIR_SUGGESTIONS = { "Mild": "Routine maintenance recommended.", "Moderate": "Schedule repair soon.", "Severe": "Immediate repair required!" } # Colors per crack type (BGR for OpenCV) TYPE_COLORS = { "Transverse": (60, 200, 60), # green "Longitudinal": (200, 120, 0), # blue-ish/orange-ish (distinct) "Alligator": (0, 165, 255), # orange "Other": (180, 180, 180) # gray } # ------------------- # Model Loading # ------------------- def load_models(): """Load models on demand""" global yolo_sess, severity_sess if yolo_sess is None: try: yolo_sess = ort.InferenceSession(MODEL_PATH, providers=PROVIDERS) except Exception as e: raise RuntimeError(f"Failed to load YOLO model: {e}") if severity_sess is None: try: severity_sess = ort.InferenceSession(SEVERITY_PATH, providers=PROVIDERS) except Exception as e: raise RuntimeError(f"Failed to load severity model: {e}") return yolo_sess.get_inputs()[0].name, severity_sess.get_inputs()[0].name # ------------------- # Utilities # ------------------- def severity_infer(bgr_crop): if bgr_crop is None or bgr_crop.size == 0: return None _, severity_in_name = load_models() rgb = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2RGB) rgb = cv2.resize(rgb, (224, 224)) inp = rgb.transpose(2, 0, 1)[None].astype(np.float32) / 255.0 out = severity_sess.run(None, {severity_in_name: inp})[0] cls = int(np.argmax(out, axis=1)[0]) return SEVERITY_LABELS[cls] if 0 <= cls < len(SEVERITY_LABELS) else str(cls) def nms(dets, iou_thresh=0.5): if len(dets) == 0: return [] boxes = np.array([[x1, y1, x2, y2, conf] for (x1, y1, x2, y2, conf) in dets], dtype=float) x1, y1, x2, y2, scores = boxes.T areas = (x2 - x1 + 1) * (y2 - y1 + 1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6) inds = np.where(iou <= iou_thresh)[0] order = order[inds + 1] return [tuple(map(lambda v: float(v) if isinstance(v, np.generic) else v, dets[i])) for i in keep] def merge_nearby_boxes(dets, dist_thresh=50): if not dets: return [] merged = [] used = set() centers = [((x1+x2)/2, (y1+y2)/2) for (x1,y1,x2,y2,_) in dets] for i, (x1, y1, x2, y2, conf) in enumerate(dets): if i in used: continue group = [(x1, y1, x2, y2, conf)] used.add(i) cx1, cy1 = centers[i] for j in range(i+1, len(dets)): if j in used: continue cx2, cy2 = centers[j] if np.hypot(cx2 - cx1, cy2 - cy1) < dist_thresh: group.append(dets[j]) used.add(j) gx1 = min(g[0] for g in group) gy1 = min(g[1] for g in group) gx2 = max(g[2] for g in group) gy2 = max(g[3] for g in group) gconf = max(g[4] for g in group) merged.append((int(gx1), int(gy1), int(gx2), int(gy2), float(gconf))) return merged def heuristic_crack_type(x1, y1, x2, y2, dets, idx): """Transverse (wide & short), Longitudinal (tall & thin), Alligator (dense overlaps), Other.""" w = max(1, x2 - x1) h = max(1, y2 - y1) aspect_ratio = w / h # Count strong overlaps with other boxes -> "Alligator" candidate overlaps = 0 area = w * h for j, (xx1, yy1, xx2, yy2, _) in enumerate(dets): if j == idx: continue inter_x1 = max(x1, xx1) inter_y1 = max(y1, yy1) inter_x2 = min(x2, xx2) inter_y2 = min(y2, yy2) inter = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1) if inter > 0.40 * area: # stricter to avoid false alligator overlaps += 1 if overlaps >= 3: return "Alligator" if aspect_ratio >= 2.2: # wide and relatively short return "Transverse" if aspect_ratio <= 0.8: # tall and relatively narrow return "Longitudinal" return "Other" def yolo_infer(bgr): """Runs the ONNX YOLO head (exported variant) and decodes simple [x,y,w,h,conf] rows.""" yolo_in_name, _ = load_models() img = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) resized = cv2.resize(img, (640, 640)) inp = resized.transpose(2, 0, 1)[None].astype(np.float32) / 255.0 out = yolo_sess.run(None, {yolo_in_name: inp})[0][0] dets = [] H, W = bgr.shape[:2] for row in out: x, y, bw, bh, conf = row[:5] if conf < 0.50: # slightly stricter to cut clutter continue # map from 640 space back to original resolution x1 = int((x - bw/2) * W / 640) y1 = int((y - bh/2) * H / 640) x2 = int((x + bw/2) * W / 640) y2 = int((y + bh/2) * H / 640) # clamp x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(W-1, x2), min(H-1, y2) if x2 > x1 and y2 > y1: dets.append((x1, y1, x2, y2, float(conf))) # Clean-up: NMS then merge very close boxes dets = nms(dets, iou_thresh=0.50) dets = merge_nearby_boxes(dets, dist_thresh=50) return dets # ------------------- # Drawing & Reporting # ------------------- def draw_detection(img, bbox, label, crack_type): x1, y1, x2, y2 = bbox color = TYPE_COLORS.get(crack_type, TYPE_COLORS["Other"]) cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) # text background (tw, th), base = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(img, (x1, max(0, y1 - th - 8)), (x1 + tw + 6, y1), color, -1) cv2.putText(img, label, (x1 + 3, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2, cv2.LINE_AA) def make_report(detections): """ detections: list of dicts with keys: type, severity, conf, bbox, suggestion Returns nicely formatted multiline string. """ if not detections: return "No cracks detected." lines = [] lines.append("=== Detection Report ===") for i, d in enumerate(detections, 1): x1, y1, x2, y2 = d["bbox"] lines.append(f"[{i}] Type: {d['type']} | Severity: {d['severity']} | Score: {d['conf']:.2f} | " f"BBox: ({x1},{y1})–({x2},{y2})") if d["suggestion"]: lines.append(f" → Repair: {d['suggestion']}") # Aggregates type_counts = Counter(d["type"] for d in detections) sev_counts = Counter(d["severity"] for d in detections if d["severity"] is not None) lines.append("\n--- Totals by Crack Type ---") for k, v in type_counts.items(): lines.append(f"{k}: {v}") lines.append("\n--- Totals by Severity ---") for k, v in sev_counts.items(): lines.append(f"{k}: {v}") return "\n".join(lines) # ------------------- # Processing Functions for Gradio # ------------------- def process_image_gradio(image): """Process image for Gradio interface""" # Convert PIL image to OpenCV format image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) dets = yolo_infer(image_cv) detections = [] for idx, (x1, y1, x2, y2, conf) in enumerate(dets): crop = image_cv[y1:y2, x1:x2] sev = severity_infer(crop) ctype = heuristic_crack_type(x1, y1, x2, y2, dets, idx) suggestion = REPAIR_SUGGESTIONS.get(sev, "") label = f"{ctype} | {sev} ({conf:.2f})" draw_detection(image_cv, (x1, y1, x2, y2), label, ctype) detections.append({ "bbox": (x1, y1, x2, y2), "conf": conf, "type": ctype, "severity": sev, "suggestion": suggestion }) # Convert back to RGB for display result_image = cv2.cvtColor(image_cv, cv2.COLOR_BGR2RGB) # Generate report report = make_report(detections) return result_image, report def process_video_gradio(video_path, progress=gr.Progress()): """Process video for Gradio interface""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "Could not open video file" # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) or 25 W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Create output video file output_path = "processed_video.mp4" fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(output_path, fourcc, fps, (W, H)) collected = [] frame_count = 0 # Track progress progress(0, desc="Starting video processing...") while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Update progress - correct format if frame_count % 10 == 0: # Update every 10 frames to reduce overhead progress(frame_count / total_frames, desc=f"Processing frame {frame_count}/{total_frames}") # Process frame dets = yolo_infer(frame) frame_detections = [] for idx, (x1, y1, x2, y2, conf) in enumerate(dets): crop = frame[y1:y2, x1:x2] sev = severity_infer(crop) ctype = heuristic_crack_type(x1, y1, x2, y2, dets, idx) suggestion = REPAIR_SUGGESTIONS.get(sev, "") label = f"{ctype} | {sev} ({conf:.2f})" draw_detection(frame, (x1, y1, x2, y2), label, ctype) frame_detections.append({ "bbox": (x1, y1, x2, y2), "conf": conf, "type": ctype, "severity": sev, "suggestion": suggestion, "frame": frame_count }) collected.extend(frame_detections) writer.write(frame) cap.release() writer.release() # Generate report report = make_report(collected) report += f"\n\n--- Video Summary ---\n" report += f"Total frames processed: {frame_count}\n" report += f"Total detections: {len(collected)}" progress(1.0, desc="Video processing complete!") return output_path, report # ------------------- # Gradio Interface # ------------------- def create_gradio_interface(): """Create the Gradio interface""" with gr.Blocks(title="Crack Detection System", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🏗️ Crack Detection System") gr.Markdown("Upload an image or video to detect and analyze cracks with severity assessment and crack types.") with gr.Tab("Image Detection"): with gr.Row(): with gr.Column(): image_input = gr.Image(label="Upload Image", type="pil") image_btn = gr.Button("Detect Cracks", variant="primary") with gr.Column(): image_output = gr.Image(label="Detection Results") image_report = gr.Textbox(label="Detection Report", lines=10) image_btn.click( fn=process_image_gradio, inputs=image_input, outputs=[image_output, image_report] ) with gr.Tab("Video Detection"): with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Video") video_btn = gr.Button("Process Video", variant="primary") with gr.Column(): video_output = gr.Video(label="Processed Video") video_report = gr.Textbox(label="Detection Report", lines=15) video_btn.click( fn=process_video_gradio, inputs=video_input, outputs=[video_output, video_report] ) with gr.Tab("About"): gr.Markdown(""" ## About this App This Crack Detection System uses deep learning models to: - **Detect cracks using YOLOv5** in images and videos - **Classify crack types**: Transverse, Longitudinal, Alligator, Other - **Assess severity using ResNet-18**: Mild, Moderate, Severe - **Provide repair suggestions** ### How to use: 1. Upload an image or video 2. Click the detection button 3. View results with bounding boxes and detailed report ### Crack Types: - **Transverse**: Wide and short cracks perpendicular to road direction - **Longitudinal**: Tall and narrow cracks parallel to road direction - **Alligator**: Network of interconnected cracks resembling alligator skin - **Other**: Cracks that don't fit the above categories ### Severity Levels: - **Mild**: Routine maintenance recommended - **Moderate**: Schedule repair soon - **Severe**: Immediate repair required! """) return demo # ------------------- # Main Execution # ------------------- if __name__ == "__main__": # Pre-load models for faster first inference print("Loading models...") load_models() print("Models loaded successfully!") # Create and launch Gradio app demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True )