|
import os |
|
import cv2 |
|
import numpy as np |
|
import onnxruntime as ort |
|
import tempfile |
|
import matplotlib.pyplot as plt |
|
from collections import Counter, defaultdict |
|
import gradio as gr |
|
import time |
|
from typing import List, Dict, Tuple, Optional |
|
|
|
|
|
|
|
|
|
MODEL_PATH = "best2.onnx" |
|
SEVERITY_PATH = "severity_model.onnx" |
|
PROVIDERS = ["CPUExecutionProvider"] |
|
|
|
|
|
yolo_sess = None |
|
severity_sess = None |
|
|
|
SEVERITY_LABELS = ["Mild", "Moderate", "Severe"] |
|
REPAIR_SUGGESTIONS = { |
|
"Mild": "Routine maintenance recommended.", |
|
"Moderate": "Schedule repair soon.", |
|
"Severe": "Immediate repair required!" |
|
} |
|
|
|
|
|
TYPE_COLORS = { |
|
"Transverse": (60, 200, 60), |
|
"Longitudinal": (200, 120, 0), |
|
"Alligator": (0, 165, 255), |
|
"Other": (180, 180, 180) |
|
} |
|
|
|
|
|
|
|
|
|
def load_models(): |
|
"""Load models on demand""" |
|
global yolo_sess, severity_sess |
|
|
|
if yolo_sess is None: |
|
try: |
|
yolo_sess = ort.InferenceSession(MODEL_PATH, providers=PROVIDERS) |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to load YOLO model: {e}") |
|
|
|
if severity_sess is None: |
|
try: |
|
severity_sess = ort.InferenceSession(SEVERITY_PATH, providers=PROVIDERS) |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to load severity model: {e}") |
|
|
|
return yolo_sess.get_inputs()[0].name, severity_sess.get_inputs()[0].name |
|
|
|
|
|
|
|
|
|
def severity_infer(bgr_crop): |
|
if bgr_crop is None or bgr_crop.size == 0: |
|
return None |
|
|
|
_, severity_in_name = load_models() |
|
|
|
rgb = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2RGB) |
|
rgb = cv2.resize(rgb, (224, 224)) |
|
inp = rgb.transpose(2, 0, 1)[None].astype(np.float32) / 255.0 |
|
out = severity_sess.run(None, {severity_in_name: inp})[0] |
|
cls = int(np.argmax(out, axis=1)[0]) |
|
return SEVERITY_LABELS[cls] if 0 <= cls < len(SEVERITY_LABELS) else str(cls) |
|
|
|
def nms(dets, iou_thresh=0.5): |
|
if len(dets) == 0: |
|
return [] |
|
boxes = np.array([[x1, y1, x2, y2, conf] for (x1, y1, x2, y2, conf) in dets], dtype=float) |
|
x1, y1, x2, y2, scores = boxes.T |
|
areas = (x2 - x1 + 1) * (y2 - y1 + 1) |
|
order = scores.argsort()[::-1] |
|
|
|
keep = [] |
|
while order.size > 0: |
|
i = order[0] |
|
keep.append(i) |
|
xx1 = np.maximum(x1[i], x1[order[1:]]) |
|
yy1 = np.maximum(y1[i], y1[order[1:]]) |
|
xx2 = np.minimum(x2[i], x2[order[1:]]) |
|
yy2 = np.minimum(y2[i], y2[order[1:]]) |
|
w = np.maximum(0.0, xx2 - xx1 + 1) |
|
h = np.maximum(0.0, yy2 - yy1 + 1) |
|
inter = w * h |
|
iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6) |
|
inds = np.where(iou <= iou_thresh)[0] |
|
order = order[inds + 1] |
|
return [tuple(map(lambda v: float(v) if isinstance(v, np.generic) else v, dets[i])) for i in keep] |
|
|
|
def merge_nearby_boxes(dets, dist_thresh=50): |
|
if not dets: |
|
return [] |
|
merged = [] |
|
used = set() |
|
centers = [((x1+x2)/2, (y1+y2)/2) for (x1,y1,x2,y2,_) in dets] |
|
|
|
for i, (x1, y1, x2, y2, conf) in enumerate(dets): |
|
if i in used: |
|
continue |
|
group = [(x1, y1, x2, y2, conf)] |
|
used.add(i) |
|
cx1, cy1 = centers[i] |
|
for j in range(i+1, len(dets)): |
|
if j in used: |
|
continue |
|
cx2, cy2 = centers[j] |
|
if np.hypot(cx2 - cx1, cy2 - cy1) < dist_thresh: |
|
group.append(dets[j]) |
|
used.add(j) |
|
gx1 = min(g[0] for g in group) |
|
gy1 = min(g[1] for g in group) |
|
gx2 = max(g[2] for g in group) |
|
gy2 = max(g[3] for g in group) |
|
gconf = max(g[4] for g in group) |
|
merged.append((int(gx1), int(gy1), int(gx2), int(gy2), float(gconf))) |
|
return merged |
|
|
|
def heuristic_crack_type(x1, y1, x2, y2, dets, idx): |
|
"""Transverse (wide & short), Longitudinal (tall & thin), Alligator (dense overlaps), Other.""" |
|
w = max(1, x2 - x1) |
|
h = max(1, y2 - y1) |
|
aspect_ratio = w / h |
|
|
|
|
|
overlaps = 0 |
|
area = w * h |
|
for j, (xx1, yy1, xx2, yy2, _) in enumerate(dets): |
|
if j == idx: |
|
continue |
|
inter_x1 = max(x1, xx1) |
|
inter_y1 = max(y1, yy1) |
|
inter_x2 = min(x2, xx2) |
|
inter_y2 = min(y2, yy2) |
|
inter = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1) |
|
if inter > 0.40 * area: |
|
overlaps += 1 |
|
|
|
if overlaps >= 3: |
|
return "Alligator" |
|
if aspect_ratio >= 2.2: |
|
return "Transverse" |
|
if aspect_ratio <= 0.8: |
|
return "Longitudinal" |
|
return "Other" |
|
|
|
def yolo_infer(bgr): |
|
"""Runs the ONNX YOLO head (exported variant) and decodes simple [x,y,w,h,conf] rows.""" |
|
yolo_in_name, _ = load_models() |
|
|
|
img = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB) |
|
resized = cv2.resize(img, (640, 640)) |
|
inp = resized.transpose(2, 0, 1)[None].astype(np.float32) / 255.0 |
|
out = yolo_sess.run(None, {yolo_in_name: inp})[0][0] |
|
|
|
dets = [] |
|
H, W = bgr.shape[:2] |
|
for row in out: |
|
x, y, bw, bh, conf = row[:5] |
|
if conf < 0.50: |
|
continue |
|
|
|
x1 = int((x - bw/2) * W / 640) |
|
y1 = int((y - bh/2) * H / 640) |
|
x2 = int((x + bw/2) * W / 640) |
|
y2 = int((y + bh/2) * H / 640) |
|
|
|
x1, y1 = max(0, x1), max(0, y1) |
|
x2, y2 = min(W-1, x2), min(H-1, y2) |
|
if x2 > x1 and y2 > y1: |
|
dets.append((x1, y1, x2, y2, float(conf))) |
|
|
|
|
|
dets = nms(dets, iou_thresh=0.50) |
|
dets = merge_nearby_boxes(dets, dist_thresh=50) |
|
return dets |
|
|
|
|
|
|
|
|
|
def draw_detection(img, bbox, label, crack_type): |
|
x1, y1, x2, y2 = bbox |
|
color = TYPE_COLORS.get(crack_type, TYPE_COLORS["Other"]) |
|
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) |
|
|
|
(tw, th), base = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) |
|
cv2.rectangle(img, (x1, max(0, y1 - th - 8)), (x1 + tw + 6, y1), color, -1) |
|
cv2.putText(img, label, (x1 + 3, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2, cv2.LINE_AA) |
|
|
|
def make_report(detections): |
|
""" |
|
detections: list of dicts with keys: |
|
type, severity, conf, bbox, suggestion |
|
Returns nicely formatted multiline string. |
|
""" |
|
if not detections: |
|
return "No cracks detected." |
|
|
|
lines = [] |
|
lines.append("=== Detection Report ===") |
|
for i, d in enumerate(detections, 1): |
|
x1, y1, x2, y2 = d["bbox"] |
|
lines.append(f"[{i}] Type: {d['type']} | Severity: {d['severity']} | Score: {d['conf']:.2f} | " |
|
f"BBox: ({x1},{y1})โ({x2},{y2})") |
|
if d["suggestion"]: |
|
lines.append(f" โ Repair: {d['suggestion']}") |
|
|
|
|
|
type_counts = Counter(d["type"] for d in detections) |
|
sev_counts = Counter(d["severity"] for d in detections if d["severity"] is not None) |
|
|
|
lines.append("\n--- Totals by Crack Type ---") |
|
for k, v in type_counts.items(): |
|
lines.append(f"{k}: {v}") |
|
|
|
lines.append("\n--- Totals by Severity ---") |
|
for k, v in sev_counts.items(): |
|
lines.append(f"{k}: {v}") |
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
def process_image_gradio(image): |
|
"""Process image for Gradio interface""" |
|
|
|
image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
|
|
dets = yolo_infer(image_cv) |
|
detections = [] |
|
|
|
for idx, (x1, y1, x2, y2, conf) in enumerate(dets): |
|
crop = image_cv[y1:y2, x1:x2] |
|
sev = severity_infer(crop) |
|
ctype = heuristic_crack_type(x1, y1, x2, y2, dets, idx) |
|
suggestion = REPAIR_SUGGESTIONS.get(sev, "") |
|
label = f"{ctype} | {sev} ({conf:.2f})" |
|
draw_detection(image_cv, (x1, y1, x2, y2), label, ctype) |
|
detections.append({ |
|
"bbox": (x1, y1, x2, y2), |
|
"conf": conf, |
|
"type": ctype, |
|
"severity": sev, |
|
"suggestion": suggestion |
|
}) |
|
|
|
|
|
result_image = cv2.cvtColor(image_cv, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
report = make_report(detections) |
|
|
|
return result_image, report |
|
|
|
def process_video_gradio(video_path, progress=gr.Progress()): |
|
"""Process video for Gradio interface""" |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None, "Could not open video file" |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25 |
|
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
output_path = "processed_video.mp4" |
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
writer = cv2.VideoWriter(output_path, fourcc, fps, (W, H)) |
|
|
|
collected = [] |
|
frame_count = 0 |
|
|
|
|
|
progress(0, desc="Starting video processing...") |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
frame_count += 1 |
|
|
|
|
|
if frame_count % 10 == 0: |
|
progress(frame_count / total_frames, desc=f"Processing frame {frame_count}/{total_frames}") |
|
|
|
|
|
dets = yolo_infer(frame) |
|
frame_detections = [] |
|
|
|
for idx, (x1, y1, x2, y2, conf) in enumerate(dets): |
|
crop = frame[y1:y2, x1:x2] |
|
sev = severity_infer(crop) |
|
ctype = heuristic_crack_type(x1, y1, x2, y2, dets, idx) |
|
suggestion = REPAIR_SUGGESTIONS.get(sev, "") |
|
label = f"{ctype} | {sev} ({conf:.2f})" |
|
draw_detection(frame, (x1, y1, x2, y2), label, ctype) |
|
frame_detections.append({ |
|
"bbox": (x1, y1, x2, y2), |
|
"conf": conf, |
|
"type": ctype, |
|
"severity": sev, |
|
"suggestion": suggestion, |
|
"frame": frame_count |
|
}) |
|
|
|
collected.extend(frame_detections) |
|
writer.write(frame) |
|
|
|
cap.release() |
|
writer.release() |
|
|
|
|
|
report = make_report(collected) |
|
report += f"\n\n--- Video Summary ---\n" |
|
report += f"Total frames processed: {frame_count}\n" |
|
report += f"Total detections: {len(collected)}" |
|
|
|
progress(1.0, desc="Video processing complete!") |
|
|
|
return output_path, report |
|
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the Gradio interface""" |
|
|
|
with gr.Blocks(title="Crack Detection System", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# ๐๏ธ Crack Detection System") |
|
gr.Markdown("Upload an image or video to detect and analyze cracks with severity assessment and crack types.") |
|
|
|
with gr.Tab("Image Detection"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
image_input = gr.Image(label="Upload Image", type="pil") |
|
image_btn = gr.Button("Detect Cracks", variant="primary") |
|
|
|
with gr.Column(): |
|
image_output = gr.Image(label="Detection Results") |
|
image_report = gr.Textbox(label="Detection Report", lines=10) |
|
|
|
image_btn.click( |
|
fn=process_image_gradio, |
|
inputs=image_input, |
|
outputs=[image_output, image_report] |
|
) |
|
|
|
with gr.Tab("Video Detection"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
video_input = gr.Video(label="Upload Video") |
|
video_btn = gr.Button("Process Video", variant="primary") |
|
|
|
with gr.Column(): |
|
video_output = gr.Video(label="Processed Video") |
|
video_report = gr.Textbox(label="Detection Report", lines=15) |
|
|
|
video_btn.click( |
|
fn=process_video_gradio, |
|
inputs=video_input, |
|
outputs=[video_output, video_report] |
|
) |
|
|
|
with gr.Tab("About"): |
|
gr.Markdown(""" |
|
## About this App |
|
|
|
This Crack Detection System uses deep learning models to: |
|
|
|
- **Detect cracks using YOLOv5** in images and videos |
|
- **Classify crack types**: Transverse, Longitudinal, Alligator, Other |
|
- **Assess severity using ResNet-18**: Mild, Moderate, Severe |
|
- **Provide repair suggestions** |
|
|
|
### How to use: |
|
1. Upload an image or video |
|
2. Click the detection button |
|
3. View results with bounding boxes and detailed report |
|
|
|
### Crack Types: |
|
- **Transverse**: Wide and short cracks perpendicular to road direction |
|
- **Longitudinal**: Tall and narrow cracks parallel to road direction |
|
- **Alligator**: Network of interconnected cracks resembling alligator skin |
|
- **Other**: Cracks that don't fit the above categories |
|
|
|
### Severity Levels: |
|
- **Mild**: Routine maintenance recommended |
|
- **Moderate**: Schedule repair soon |
|
- **Severe**: Immediate repair required! |
|
""") |
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("Loading models...") |
|
load_models() |
|
print("Models loaded successfully!") |
|
|
|
|
|
demo = create_gradio_interface() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True |
|
) |