trainmodel3 / app.py
nagasurendra's picture
Update app.py
38665fe verified
import cv2
import torch
import gradio as gr
import numpy as np
import os
import json
import logging
import matplotlib.pyplot as plt
import csv
import time
from datetime import datetime
from collections import Counter
from typing import List, Dict, Any, Optional
from ultralytics import YOLO
import piexif
import zipfile
import base64
# Directory setup
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
CAPTURED_FRAMES_DIR = "captured_frames"
OUTPUT_DIR = "outputs"
FLIGHT_LOG_DIR = "flight_logs"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(FLIGHT_LOG_DIR, exist_ok=True)
os.chmod(CAPTURED_FRAMES_DIR, 0o777)
os.chmod(OUTPUT_DIR, 0o777)
os.chmod(FLIGHT_LOG_DIR, 0o777)
# Global variables
log_entries: List[str] = []
detected_counts: List[int] = []
detected_issues: List[str] = []
gps_coordinates: List[List[float]] = []
last_metrics: Dict[str, Any] = {}
frame_count: int = 0
SAVE_IMAGE_INTERVAL = 1
MAX_IMAGES = 500
# Model setup
def safe_load_yolo_model(path):
torch.serialization.add_safe_globals([torch, 'ultralytics.nn.tasks.DetectionModel'])
return YOLO(path)
model_paths = {
'YOLOv11': './data/yolo11n.pt',
'Crack & Pothole Detector': './data/pothole.pt',
'Toll gates': './data/tollgate.pt',
'Railway Bridges': './data/bridges.pt'
}
models = {name: safe_load_yolo_model(path).to("cuda" if torch.cuda.is_available() else "cpu") for name, path in model_paths.items()}
for name, model in models.items():
if torch.cuda.is_available():
model.half()
model_colors = {
'YOLOv11': (0, 255, 0), # Green
'Crack & Pothole Detector': (255, 0, 0), # Red
'Toll gates': (0, 0, 255), # Blue
'Railway Bridges': (0, 255, 255) # Yellow
}
# Helper functions
def zip_all_outputs(report_path: str, video_path: str, chart_path: str, map_path: str) -> str:
zip_path = os.path.join(OUTPUT_DIR, f"drone_analysis_outputs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip")
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_STORED) as zipf:
if os.path.exists(report_path):
zipf.write(report_path, os.path.basename(report_path))
if os.path.exists(video_path):
zipf.write(video_path, os.path.join("outputs", os.path.basename(video_path)))
if os.path.exists(chart_path):
zipf.write(chart_path, os.path.join("outputs", os.path.basename(chart_path)))
if os.path.exists(map_path):
zipf.write(map_path, os.path.join("outputs", os.path.basename(map_path)))
for file in detected_issues:
if os.path.exists(file):
zipf.write(file, os.path.join("captured_frames", os.path.basename(file)))
for root, _, files in os.walk(FLIGHT_LOG_DIR):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.join("flight_logs", file))
log_entries.append(f"Created ZIP: {zip_path}")
return zip_path
except Exception as e:
log_entries.append(f"Error: Failed to create ZIP: {str(e)}")
return ""
def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str:
map_path = os.path.join(OUTPUT_DIR, f"map_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.figure(figsize=(4, 4))
plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points')
plt.title("Issue Locations Map")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.legend()
plt.savefig(map_path)
plt.close()
return map_path
def write_geotag(image_path: str, gps_coord: List[float]) -> bool:
try:
lat = abs(gps_coord[0])
lon = abs(gps_coord[1])
lat_ref = "N" if gps_coord[0] >= 0 else "S"
lon_ref = "E" if gps_coord[1] >= 0 else "W"
exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}}
exif_dict["GPS"] = {
piexif.GPSIFD.GPSLatitudeRef: lat_ref,
piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)),
piexif.GPSIFD.GPSLongitudeRef: lon_ref,
piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1))
}
piexif.insert(piexif.dump(exif_dict), image_path)
return True
except Exception as e:
log_entries.append(f"Error: Failed to geotag {image_path}: {str(e)}")
return False
def write_flight_log(frame_count: int, gps_coord: List[float], timestamp: str) -> str:
log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv")
try:
with open(log_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"])
writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60])
return log_path
except Exception as e:
log_entries.append(f"Error: Failed to write flight log {log_path}: {str(e)}")
return ""
def check_image_quality(frame: np.ndarray, input_resolution: int) -> bool:
height, width, _ = frame.shape
frame_resolution = width * height
if frame_resolution < 2_073_600:
log_entries.append(f"Frame {frame_count}: Resolution {width}x{height} below 2MP")
return False
if frame_resolution < input_resolution:
log_entries.append(f"Frame {frame_count}: Output resolution below input")
return False
return True
def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]:
counts = Counter([(det["label"], det["model"]) for det in detections])
return {
"items": [{"type": k[0], "model": k[1], "count": v} for k, v in counts.items()],
"total_detections": len(detections),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
plt.figure(figsize=(4, 2))
plt.plot(detected_counts[-50:], marker='o', color='#FF8C00')
plt.title("Detections Over Time")
plt.xlabel("Frame")
plt.ylabel("Count")
plt.grid(True)
plt.tight_layout()
chart_path = os.path.join(OUTPUT_DIR, f"chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(chart_path)
plt.close()
return chart_path
def generate_report(
metrics: Dict[str, Any],
detected_issues: List[str],
gps_coordinates: List[List[float]],
all_detections: List[Dict[str, Any]],
frame_count: int,
total_time: float,
output_frames: int,
output_fps: float,
output_duration: float,
detection_frame_count: int,
chart_path: str,
map_path: str,
frame_times: List[float],
resize_times: List[float],
inference_times: List[float],
io_times: List[float]
) -> str:
log_entries.append("Generating report...")
report_path = os.path.join(OUTPUT_DIR, f"drone_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_content = [
"<!DOCTYPE html>",
"<html lang='en'>",
"<head>",
"<meta charset='UTF-8'>",
"<title>NHAI Drone Survey Analysis Report</title>",
"<style>",
"body { font-family: Arial, sans-serif; margin: 40px; }",
"h1, h2, h3 { color: #333; }",
"ul { margin-left: 20px; }",
"table { border-collapse: collapse; width: 100%; margin: 10px 0; }",
"th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }",
"th { background-color: #f2f2f2; }",
"img { max-width: 600px; height: auto; margin: 10px 0; }",
"p.caption { font-weight: bold; margin: 5px 0; }",
"</style>",
"</head>",
"<body>",
"<h1>NHAI Drone Survey Analysis Report</h1>",
"<h2>Project Details</h2>",
"<ul>",
"<li><strong>Project Name:</strong> NH-44 Delhi-Hyderabad Section (Package XYZ)</li>",
"<li><strong>Highway Section:</strong> Km 100 to Km 150</li>",
"<li><strong>State:</strong> Telangana</li>",
"<li><strong>Region:</strong> South</li>",
f"<li><strong>Survey Date:</strong> {datetime.now().strftime('%Y-%m-%d')}</li>",
"<li><strong>Drone Service Provider:</strong> ABC Drone Services Pvt. Ltd.</li>",
"<li><strong>Technology Service Provider:</strong> XYZ AI Analytics Ltd.</li>",
f"<li><strong>Work Order Reference:</strong> Data Lake WO-{datetime.now().strftime('%Y%m%d')}-XYZ</li>",
"<li><strong>Report Prepared By:</strong> Nagasurendra, Data Analyst</li>",
f"<li><strong>Report Date:</strong> {datetime.now().strftime('%Y-%m-%d')}</li>",
"</ul>",
"<h2>1. Introduction</h2>",
"<p>This report consolidates drone survey results for NH-44 (Km 100–150) using multiple YOLO models for detecting road defects and toll gates.</p>",
"<h2>2. Drone Survey Metadata</h2>",
"<ul>",
"<li><strong>Drone Speed:</strong> 5 m/s</li>",
"<li><strong>Drone Height:</strong> 60 m</li>",
"<li><strong>Camera Sensor:</strong> RGB, 12 MP</li>",
"<li><strong>Recording Type:</strong> JPEG, 90° nadir</li>",
"<li><strong>Image Overlap:</strong> 85%</li>",
"<li><strong>Flight Pattern:</strong> Single lap, ROW centered</li>",
"<li><strong>Geotagging:</strong> Enabled</li>",
"<li><strong>Satellite Lock:</strong> 12 satellites</li>",
"<li><strong>Terrain Follow Mode:</strong> Enabled</li>",
"</ul>",
"<h2>3. Quality Check Results</h2>",
"<ul>",
"<li><strong>Resolution:</strong> 1920x1080</li>",
"<li><strong>Overlap:</strong> 85%</li>",
"<li><strong>Camera Angle:</strong> 90° nadir</li>",
"<li><strong>Drone Speed:</strong> ≤ 5 m/s</li>",
"<li><strong>Geotagging:</strong> 100% compliant</li>",
"<li><strong>QC Status:</strong> Passed</li>",
"</ul>",
"<h2>4. AI/ML Analytics</h2>",
f"<p><strong>Total Frames Processed:</strong> {frame_count}</p>",
f"<p><strong>Detection Frames:</strong> {detection_frame_count} ({detection_frame_count/frame_count*100:.1f}%)</p>",
f"<p><strong>Total Detections:</strong> {metrics['total_detections']}</p>",
"<p><strong>Breakdown by Model and Type:</strong></p>",
"<ul>"
]
for item in metrics.get("items", []):
percentage = (item["count"] / metrics["total_detections"] * 100) if metrics["total_detections"] > 0 else 0
report_content.append(f"<li>{item['type']} (Model: {item['model']}): {item['count']} ({percentage:.1f}%)</li>")
report_content.extend([
"</ul>",
f"<p><strong>Processing Time:</strong> {total_time:.1f} seconds</p>",
f"<p><strong>Average Frame Time:</strong> {sum(frame_times)/len(frame_times):.1f} ms</p>" if frame_times else "<p><strong>Average Frame Time:</strong> N/A</p>",
f"<p><strong>Average Resize Time:</strong> {sum(resize_times)/len(resize_times):.1f} ms</p>" if resize_times else "<p><strong>Average Resize Time:</strong> N/A</p>",
f"<p><strong>Average Inference Time:</strong> {sum(inference_times)/len(inference_times):.1f} ms</p>" if inference_times else "<p><strong>Average Inference Time:</strong> N/A</p>",
f"<p><strong>Average I/O Time:</strong> {sum(io_times)/len(io_times):.1f} ms</p>" if io_times else "<p><strong>Average I/O Time:</strong> N/A</p>",
f"<p><strong>Timestamp:</strong> {metrics.get('timestamp', 'N/A')}</p>",
"<p><strong>Summary:</strong> Road defects and toll gates detected across multiple models.</p>",
"<h2>5. Output File Structure</h2>",
"<p>ZIP file contains:</p>",
"<ul>",
f"<li><code>drone_analysis_report_{timestamp}.html</code>: This report</li>",
"<li><code>outputs/processed_output.mp4</code>: Processed video with annotations</li>",
f"<li><code>outputs/chart_{timestamp}.png</code>: Detection trend chart</li>",
f"<li><code>outputs/map_{timestamp}.png</code>: Issue locations map</li>",
"<li><code>captured_frames/detected_<frame>.jpg</code>: Geotagged images for detected issues</li>",
"<li><code>flight_logs/flight_log_<frame>.csv</code>: Flight logs matching image frames</li>",
"</ul>",
"<p><strong>Note:</strong> Images and logs share frame numbers (e.g., <code>detected_000001.jpg</code> corresponds to <code>flight_log_000001.csv</code>).</p>",
"<h2>6. Geotagged Images</h2>",
f"<p><strong>Total Images:</strong> {len(detected_issues)}</p>",
f"<p><strong>Storage:</strong> Data Lake <code>/project_xyz/images/{datetime.now().strftime('%Y%m%d')}</code></p>",
"<table>",
"<tr><th>Frame</th><th>Issue Type</th><th>Model</th><th>GPS (Lat, Lon)</th><th>Timestamp</th><th>Confidence</th><th>Image Path</th></tr>"
])
for detection in all_detections[:100]:
report_content.append(
f"<tr><td>{detection['frame']:06d}</td><td>{detection['label']}</td><td>{detection['model']}</td><td>({detection['gps'][0]:.6f}, {detection['gps'][1]:.6f})</td><td>{detection['timestamp']}</td><td>{detection['conf']:.1f}</td><td>captured_frames/{os.path.basename(detection['path'])}</td></tr>"
)
report_content.extend([
"</table>",
"<h2>7. Flight Logs</h2>",
f"<p><strong>Total Logs:</strong> {len(detected_issues)}</p>",
f"<p><strong>Storage:</strong> Data Lake <code>/project_xyz/flight_logs/{datetime.now().strftime('%Y%m%d')}</code></p>",
"<table>",
"<tr><th>Frame</th><th>Timestamp</th><th>Latitude</th><th>Longitude</th><th>Speed (m/s)</th><th>Satellites</th><th>Altitude (m)</th><th>Log Path</th></tr>"
])
for detection in all_detections[:100]:
log_path = f"flight_logs/flight_log_{detection['frame']:06d}.csv"
report_content.append(
f"<tr><td>{detection['frame']:06d}</td><td>{detection['timestamp']}</td><td>{detection['gps'][0]:.6f}</td><td>{detection['gps'][1]:.6f}</td><td>5.0</td><td>12</td><td>60</td><td>{log_path}</td></tr>"
)
report_content.extend([
"</table>",
"<h2>8. Processed Video</h2>",
f"<p><strong>Path:</strong> outputs/processed_output.mp4</p>",
f"<p><strong>Frames:</strong> {output_frames}</p>",
f"<p><strong>FPS:</strong> {output_fps:.1f}</p>",
f"<p><strong>Duration:</strong> {output_duration:.1f} seconds</p>",
"<h2>9. Visualizations</h2>",
f"<p><strong>Detection Trend Chart:</strong> outputs/chart_{timestamp}.png</p>",
f"<p><strong>Issue Locations Map:</strong> outputs/map_{timestamp}.png</p>",
"<h2>10. Processing Timestamps</h2>",
f"<p><strong>Total Processing Time:</strong> {total_time:.1f} seconds</p>",
"<p><strong>Log Entries (Last 10):</strong></p>",
"<ul>"
])
for entry in log_entries[-10:]:
report_content.append(f"<li>{entry}</li>")
report_content.extend([
"</ul>",
"<h2>11. Stakeholder Validation</h2>",
"<ul>",
"<li><strong>AE/IE Comments:</strong> [Pending]</li>",
"<li><strong>PD/RO Comments:</strong> [Pending]</li>",
"</ul>",
"<h2>12. Recommendations</h2>",
"<ul>",
"<li>Repair potholes in high-traffic areas.</li>",
"<li>Seal cracks to prevent further degradation.</li>",
"<li>Inspect detected toll gates for compliance.</li>",
"</ul>",
"<h2>13. Data Lake References</h2>",
"<ul>",
f"<li><strong>Images:</strong> <code>/project_xyz/images/{datetime.now().strftime('%Y%m%d')}</code></li>",
f"<li><strong>Flight Logs:</strong> <code>/project_xyz/flight_logs/{datetime.now().strftime('%Y%m%d')}</code></li>",
f"<li><strong>Video:</strong> <code>/project_xyz/videos/processed_output_{timestamp}.mp4</code></li>",
f"<li><strong>DAMS Dashboard:</strong> <code>/project_xyz/dams/{datetime.now().strftime('%Y%m%d')}</code></li>",
"</ul>",
"<h2>14. Captured Images</h2>",
"<p>Below are the embedded images from the captured frames directory showing detected issues:</p>",
""
])
for image_path in detected_issues:
if os.path.exists(image_path):
image_name = os.path.basename(image_path)
try:
with open(image_path, "rb") as image_file:
base64_string = base64.b64encode(image_file.read()).decode('utf-8')
report_content.append(f"<img src='data:image/jpeg;base64,{base64_string}' alt='{image_name}'>")
report_content.append(f"<p class='caption'>Image: {image_name}</p>")
report_content.append("")
except Exception as e:
log_entries.append(f"Error: Failed to encode image {image_name} to base64: {str(e)}")
report_content.extend([
"</body>",
"</html>"
])
try:
with open(report_path, 'w') as f:
f.write("\n".join(report_content))
log_entries.append(f"Report saved at: {report_path}")
return report_path
except Exception as e:
log_entries.append(f"Error: Failed to save report: {str(e)}")
return ""
def process_video(video, selected_model, resize_width=1920, resize_height=1080, frame_skip=1):
global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries
frame_count = 0
detected_counts.clear()
detected_issues.clear()
gps_coordinates.clear()
log_entries.clear()
last_metrics = {}
if video is None:
log_entries.append("Error: No video uploaded")
return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None
log_entries.append("Starting video processing...")
start_time = time.time()
cap = cv2.VideoCapture(video)
if not cap.isOpened():
log_entries.append("Error: Could not open video file")
return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
input_resolution = frame_width * frame_height
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log_entries.append(f"Input video: {frame_width}x{frame_height} at {fps} FPS, {total_frames} frames")
out_width, out_height = resize_width, resize_height
output_path = os.path.join(OUTPUT_DIR, "processed_output.mp4")
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'XVID'), fps, (out_width, out_height))
if not out.isOpened():
log_entries.append("Error: Failed to initialize video writer")
cap.release()
return None, json.dumps({"error": "Video writer failed"}, indent=2), "\n".join(log_entries), [], None, None, None
processed_frames = 0
all_detections = []
frame_times = []
inference_times = []
resize_times = []
io_times = []
detection_frame_count = 0
output_frame_count = 0
last_annotated_frame = None
disk_space_threshold = 1024 * 1024 * 1024
# Select models based on dropdown
use_models = models if selected_model == "All" else {selected_model: models[selected_model]}
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % frame_skip != 0:
continue
processed_frames += 1
frame_start = time.time()
if os.statvfs(os.path.dirname(output_path)).f_frsize * os.statvfs(os.path.dirname(output_path)).f_bavail < disk_space_threshold:
log_entries.append("Error: Insufficient disk space")
break
frame = cv2.resize(frame, (out_width, out_height))
resize_times.append((time.time() - frame_start) * 1000)
# Comment out quality check to process all frames
# if not check_image_quality(frame, input_resolution):
# continue
annotated_frame = frame.copy()
frame_detections = []
inference_start = time.time()
for model_name, model in use_models.items():
results = model(annotated_frame, verbose=False, conf=0.25, iou=0.45)
for result in results:
for box in result.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
class_id = int(box.cls[0])
label = f"{model.names[class_id]} - {box.conf[0]:.2f}"
color = model_colors.get(model_name, (0, 255, 255))
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
detection_data = {
"label": model.names[class_id],
"model": model_name,
"box": [x1, y1, x2, y2],
"conf": float(box.conf[0]),
"gps": [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)],
"timestamp": f"{int(frame_count / fps // 60):02d}:{int(frame_count / fps % 60):02d}",
"frame": frame_count,
"path": os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg")
}
frame_detections.append(detection_data)
inference_times.append((time.time() - inference_start) * 1000)
frame_timestamp = frame_count / fps if fps > 0 else 0
timestamp_str = f"{int(frame_timestamp // 60):02d}:{int(frame_timestamp % 60):02d}"
gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)]
gps_coordinates.append(gps_coord)
io_start = time.time()
if frame_detections:
detection_frame_count += 1
if detection_frame_count % SAVE_IMAGE_INTERVAL == 0:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg")
if cv2.imwrite(captured_frame_path, annotated_frame):
if write_geotag(captured_frame_path, gps_coord):
detected_issues.append(captured_frame_path)
if len(detected_issues) > MAX_IMAGES:
os.remove(detected_issues.pop(0))
else:
log_entries.append(f"Frame {frame_count}: Geotagging failed")
else:
log_entries.append(f"Error: Failed to save frame at {captured_frame_path}")
write_flight_log(frame_count, gps_coord, timestamp_str)
io_times.append((time.time() - io_start) * 1000)
out.write(annotated_frame)
output_frame_count += 1
last_annotated_frame = annotated_frame
if frame_skip > 1:
for _ in range(frame_skip - 1):
out.write(annotated_frame)
output_frame_count += 1
detected_counts.append(len(frame_detections))
all_detections.extend(frame_detections)
for detection in frame_detections:
log_entries.append(f"Frame {frame_count} at {timestamp_str}: Detected {detection['label']} (Model: {detection['model']}) with confidence {detection['conf']:.2f}")
frame_times.append((time.time() - frame_start) * 1000)
if len(log_entries) > 50:
log_entries.pop(0)
if time.time() - start_time > 600:
log_entries.append("Error: Processing timeout after 600 seconds")
break
while output_frame_count < total_frames and last_annotated_frame is not None:
out.write(last_annotated_frame)
output_frame_count += 1
last_metrics = update_metrics(all_detections)
out.release()
cap.release()
cap = cv2.VideoCapture(output_path)
if not cap.isOpened():
log_entries.append("Error: Failed to open output video for verification")
output_path = None
else:
output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
output_fps = cap.get(cv2.CAP_PROP_FPS)
output_duration = output_frames / output_fps if output_fps > 0 else 0
cap.release()
log_entries.append(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds")
total_time = time.time() - start_time
log_entries.append(f"Processing completed in {total_time:.2f} seconds")
chart_path = generate_line_chart()
map_path = generate_map(gps_coordinates[-5:], all_detections)
report_path = generate_report(
last_metrics,
detected_issues,
gps_coordinates,
all_detections,
frame_count,
total_time,
output_frames,
output_fps,
output_duration,
detection_frame_count,
chart_path,
map_path,
frame_times,
resize_times,
inference_times,
io_times
)
output_zip_path = zip_all_outputs(report_path, output_path, chart_path, map_path)
return (
output_path,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_issues,
chart_path,
map_path,
output_zip_path
)
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface:
gr.Markdown("# NHAI Road Defect Detection Dashboard")
with gr.Row():
with gr.Column(scale=3):
video_input = gr.Video(label="Upload Video")
model_dropdown = gr.Dropdown(
choices=["All"] + list(model_paths.keys()),
label="Select YOLO Model(s)",
value="All"
)
width_slider = gr.Slider(320, 1920, value=1920, label="Output Width", step=1)
height_slider = gr.Slider(240, 1080, value=1080, label="Output Height", step=1)
skip_slider = gr.Slider(1, 20, value=1, label="Frame Skip", step=1)
process_btn = gr.Button("Process Video", variant="primary")
with gr.Column(scale=1):
metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False)
with gr.Row():
video_output = gr.Video(label="Processed Video")
issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
logs_output = gr.Textbox(label="Logs", lines=5, interactive=False)
with gr.Row():
gr.Markdown("## Download Results")
with gr.Row():
output_zip_download = gr.File(label="Download All Outputs (ZIP)")
process_btn.click(
fn=process_video,
inputs=[video_input, model_dropdown, width_slider, height_slider, skip_slider],
outputs=[
video_output,
metrics_output,
logs_output,
issue_gallery,
chart_output,
map_output,
output_zip_download
]
)
if __name__ == "__main__":
iface.launch()