import cv2 import torch import gradio as gr import numpy as np import os import json import logging import matplotlib.pyplot as plt import csv import time from datetime import datetime from collections import Counter from typing import List, Dict, Any, Optional from ultralytics import YOLO import piexif import zipfile import base64 # Directory setup os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" logging.basicConfig(filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") CAPTURED_FRAMES_DIR = "captured_frames" OUTPUT_DIR = "outputs" FLIGHT_LOG_DIR = "flight_logs" os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) os.makedirs(FLIGHT_LOG_DIR, exist_ok=True) os.chmod(CAPTURED_FRAMES_DIR, 0o777) os.chmod(OUTPUT_DIR, 0o777) os.chmod(FLIGHT_LOG_DIR, 0o777) # Global variables log_entries: List[str] = [] detected_counts: List[int] = [] detected_issues: List[str] = [] gps_coordinates: List[List[float]] = [] last_metrics: Dict[str, Any] = {} frame_count: int = 0 SAVE_IMAGE_INTERVAL = 1 MAX_IMAGES = 500 # Model setup def safe_load_yolo_model(path): torch.serialization.add_safe_globals([torch, 'ultralytics.nn.tasks.DetectionModel']) return YOLO(path) model_paths = { 'YOLOv11': './data/yolo11n.pt', 'Crack & Pothole Detector': './data/pothole.pt', 'Toll gates': './data/tollgate.pt', 'Railway Bridges': './data/bridges.pt' } models = {name: safe_load_yolo_model(path).to("cuda" if torch.cuda.is_available() else "cpu") for name, path in model_paths.items()} for name, model in models.items(): if torch.cuda.is_available(): model.half() model_colors = { 'YOLOv11': (0, 255, 0), # Green 'Crack & Pothole Detector': (255, 0, 0), # Red 'Toll gates': (0, 0, 255), # Blue 'Railway Bridges': (0, 255, 255) # Yellow } # Helper functions def zip_all_outputs(report_path: str, video_path: str, chart_path: str, map_path: str) -> str: zip_path = os.path.join(OUTPUT_DIR, f"drone_analysis_outputs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip") try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_STORED) as zipf: if os.path.exists(report_path): zipf.write(report_path, os.path.basename(report_path)) if os.path.exists(video_path): zipf.write(video_path, os.path.join("outputs", os.path.basename(video_path))) if os.path.exists(chart_path): zipf.write(chart_path, os.path.join("outputs", os.path.basename(chart_path))) if os.path.exists(map_path): zipf.write(map_path, os.path.join("outputs", os.path.basename(map_path))) for file in detected_issues: if os.path.exists(file): zipf.write(file, os.path.join("captured_frames", os.path.basename(file))) for root, _, files in os.walk(FLIGHT_LOG_DIR): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.join("flight_logs", file)) log_entries.append(f"Created ZIP: {zip_path}") return zip_path except Exception as e: log_entries.append(f"Error: Failed to create ZIP: {str(e)}") return "" def generate_map(gps_coords: List[List[float]], items: List[Dict[str, Any]]) -> str: map_path = os.path.join(OUTPUT_DIR, f"map_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.figure(figsize=(4, 4)) plt.scatter([x[1] for x in gps_coords], [x[0] for x in gps_coords], c='blue', label='GPS Points') plt.title("Issue Locations Map") plt.xlabel("Longitude") plt.ylabel("Latitude") plt.legend() plt.savefig(map_path) plt.close() return map_path def write_geotag(image_path: str, gps_coord: List[float]) -> bool: try: lat = abs(gps_coord[0]) lon = abs(gps_coord[1]) lat_ref = "N" if gps_coord[0] >= 0 else "S" lon_ref = "E" if gps_coord[1] >= 0 else "W" exif_dict = piexif.load(image_path) if os.path.exists(image_path) else {"GPS": {}} exif_dict["GPS"] = { piexif.GPSIFD.GPSLatitudeRef: lat_ref, piexif.GPSIFD.GPSLatitude: ((int(lat), 1), (0, 1), (0, 1)), piexif.GPSIFD.GPSLongitudeRef: lon_ref, piexif.GPSIFD.GPSLongitude: ((int(lon), 1), (0, 1), (0, 1)) } piexif.insert(piexif.dump(exif_dict), image_path) return True except Exception as e: log_entries.append(f"Error: Failed to geotag {image_path}: {str(e)}") return False def write_flight_log(frame_count: int, gps_coord: List[float], timestamp: str) -> str: log_path = os.path.join(FLIGHT_LOG_DIR, f"flight_log_{frame_count:06d}.csv") try: with open(log_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Frame", "Timestamp", "Latitude", "Longitude", "Speed_ms", "Satellites", "Altitude_m"]) writer.writerow([frame_count, timestamp, gps_coord[0], gps_coord[1], 5.0, 12, 60]) return log_path except Exception as e: log_entries.append(f"Error: Failed to write flight log {log_path}: {str(e)}") return "" def check_image_quality(frame: np.ndarray, input_resolution: int) -> bool: height, width, _ = frame.shape frame_resolution = width * height if frame_resolution < 2_073_600: log_entries.append(f"Frame {frame_count}: Resolution {width}x{height} below 2MP") return False if frame_resolution < input_resolution: log_entries.append(f"Frame {frame_count}: Output resolution below input") return False return True def update_metrics(detections: List[Dict[str, Any]]) -> Dict[str, Any]: counts = Counter([(det["label"], det["model"]) for det in detections]) return { "items": [{"type": k[0], "model": k[1], "count": v} for k, v in counts.items()], "total_detections": len(detections), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } def generate_line_chart() -> Optional[str]: if not detected_counts: return None plt.figure(figsize=(4, 2)) plt.plot(detected_counts[-50:], marker='o', color='#FF8C00') plt.title("Detections Over Time") plt.xlabel("Frame") plt.ylabel("Count") plt.grid(True) plt.tight_layout() chart_path = os.path.join(OUTPUT_DIR, f"chart_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") plt.savefig(chart_path) plt.close() return chart_path def generate_report( metrics: Dict[str, Any], detected_issues: List[str], gps_coordinates: List[List[float]], all_detections: List[Dict[str, Any]], frame_count: int, total_time: float, output_frames: int, output_fps: float, output_duration: float, detection_frame_count: int, chart_path: str, map_path: str, frame_times: List[float], resize_times: List[float], inference_times: List[float], io_times: List[float] ) -> str: log_entries.append("Generating report...") report_path = os.path.join(OUTPUT_DIR, f"drone_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') report_content = [ "", "", "
", "", "This report consolidates drone survey results for NH-44 (Km 100–150) using multiple YOLO models for detecting road defects and toll gates.
", "Total Frames Processed: {frame_count}
", f"Detection Frames: {detection_frame_count} ({detection_frame_count/frame_count*100:.1f}%)
", f"Total Detections: {metrics['total_detections']}
", "Breakdown by Model and Type:
", "Processing Time: {total_time:.1f} seconds
", f"Average Frame Time: {sum(frame_times)/len(frame_times):.1f} ms
" if frame_times else "Average Frame Time: N/A
", f"Average Resize Time: {sum(resize_times)/len(resize_times):.1f} ms
" if resize_times else "Average Resize Time: N/A
", f"Average Inference Time: {sum(inference_times)/len(inference_times):.1f} ms
" if inference_times else "Average Inference Time: N/A
", f"Average I/O Time: {sum(io_times)/len(io_times):.1f} ms
" if io_times else "Average I/O Time: N/A
", f"Timestamp: {metrics.get('timestamp', 'N/A')}
", "Summary: Road defects and toll gates detected across multiple models.
", "ZIP file contains:
", "drone_analysis_report_{timestamp}.html
: This reportoutputs/processed_output.mp4
: Processed video with annotationsoutputs/chart_{timestamp}.png
: Detection trend chartoutputs/map_{timestamp}.png
: Issue locations mapcaptured_frames/detected_.jpg
: Geotagged images for detected issuesflight_logs/flight_log_.csv
: Flight logs matching image framesNote: Images and logs share frame numbers (e.g., detected_000001.jpg
corresponds to flight_log_000001.csv
).
Total Images: {len(detected_issues)}
", f"Storage: Data Lake /project_xyz/images/{datetime.now().strftime('%Y%m%d')}
Frame | Issue Type | Model | GPS (Lat, Lon) | Timestamp | Confidence | Image Path |
---|---|---|---|---|---|---|
{detection['frame']:06d} | {detection['label']} | {detection['model']} | ({detection['gps'][0]:.6f}, {detection['gps'][1]:.6f}) | {detection['timestamp']} | {detection['conf']:.1f} | captured_frames/{os.path.basename(detection['path'])} |
Total Logs: {len(detected_issues)}
", f"Storage: Data Lake /project_xyz/flight_logs/{datetime.now().strftime('%Y%m%d')}
Frame | Timestamp | Latitude | Longitude | Speed (m/s) | Satellites | Altitude (m) | Log Path |
---|---|---|---|---|---|---|---|
{detection['frame']:06d} | {detection['timestamp']} | {detection['gps'][0]:.6f} | {detection['gps'][1]:.6f} | 5.0 | 12 | 60 | {log_path} |
Path: outputs/processed_output.mp4
", f"Frames: {output_frames}
", f"FPS: {output_fps:.1f}
", f"Duration: {output_duration:.1f} seconds
", "Detection Trend Chart: outputs/chart_{timestamp}.png
", f"Issue Locations Map: outputs/map_{timestamp}.png
", "Total Processing Time: {total_time:.1f} seconds
", "Log Entries (Last 10):
", "/project_xyz/images/{datetime.now().strftime('%Y%m%d')}
/project_xyz/flight_logs/{datetime.now().strftime('%Y%m%d')}
/project_xyz/videos/processed_output_{timestamp}.mp4
/project_xyz/dams/{datetime.now().strftime('%Y%m%d')}
Below are the embedded images from the captured frames directory showing detected issues:
", "" ]) for image_path in detected_issues: if os.path.exists(image_path): image_name = os.path.basename(image_path) try: with open(image_path, "rb") as image_file: base64_string = base64.b64encode(image_file.read()).decode('utf-8') report_content.append(f"Image: {image_name}
") report_content.append("") except Exception as e: log_entries.append(f"Error: Failed to encode image {image_name} to base64: {str(e)}") report_content.extend([ "", "" ]) try: with open(report_path, 'w') as f: f.write("\n".join(report_content)) log_entries.append(f"Report saved at: {report_path}") return report_path except Exception as e: log_entries.append(f"Error: Failed to save report: {str(e)}") return "" def process_video(video, selected_model, resize_width=1920, resize_height=1080, frame_skip=1): global frame_count, last_metrics, detected_counts, detected_issues, gps_coordinates, log_entries frame_count = 0 detected_counts.clear() detected_issues.clear() gps_coordinates.clear() log_entries.clear() last_metrics = {} if video is None: log_entries.append("Error: No video uploaded") return None, json.dumps({"error": "No video uploaded"}, indent=2), "\n".join(log_entries), [], None, None, None log_entries.append("Starting video processing...") start_time = time.time() cap = cv2.VideoCapture(video) if not cap.isOpened(): log_entries.append("Error: Could not open video file") return None, json.dumps({"error": "Could not open video file"}, indent=2), "\n".join(log_entries), [], None, None, None frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) input_resolution = frame_width * frame_height fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_entries.append(f"Input video: {frame_width}x{frame_height} at {fps} FPS, {total_frames} frames") out_width, out_height = resize_width, resize_height output_path = os.path.join(OUTPUT_DIR, "processed_output.mp4") out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'XVID'), fps, (out_width, out_height)) if not out.isOpened(): log_entries.append("Error: Failed to initialize video writer") cap.release() return None, json.dumps({"error": "Video writer failed"}, indent=2), "\n".join(log_entries), [], None, None, None processed_frames = 0 all_detections = [] frame_times = [] inference_times = [] resize_times = [] io_times = [] detection_frame_count = 0 output_frame_count = 0 last_annotated_frame = None disk_space_threshold = 1024 * 1024 * 1024 # Select models based on dropdown use_models = models if selected_model == "All" else {selected_model: models[selected_model]} while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % frame_skip != 0: continue processed_frames += 1 frame_start = time.time() if os.statvfs(os.path.dirname(output_path)).f_frsize * os.statvfs(os.path.dirname(output_path)).f_bavail < disk_space_threshold: log_entries.append("Error: Insufficient disk space") break frame = cv2.resize(frame, (out_width, out_height)) resize_times.append((time.time() - frame_start) * 1000) # Comment out quality check to process all frames # if not check_image_quality(frame, input_resolution): # continue annotated_frame = frame.copy() frame_detections = [] inference_start = time.time() for model_name, model in use_models.items(): results = model(annotated_frame, verbose=False, conf=0.25, iou=0.45) for result in results: for box in result.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) class_id = int(box.cls[0]) label = f"{model.names[class_id]} - {box.conf[0]:.2f}" color = model_colors.get(model_name, (0, 255, 255)) cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) detection_data = { "label": model.names[class_id], "model": model_name, "box": [x1, y1, x2, y2], "conf": float(box.conf[0]), "gps": [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)], "timestamp": f"{int(frame_count / fps // 60):02d}:{int(frame_count / fps % 60):02d}", "frame": frame_count, "path": os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") } frame_detections.append(detection_data) inference_times.append((time.time() - inference_start) * 1000) frame_timestamp = frame_count / fps if fps > 0 else 0 timestamp_str = f"{int(frame_timestamp // 60):02d}:{int(frame_timestamp % 60):02d}" gps_coord = [17.385044 + (frame_count * 0.0001), 78.486671 + (frame_count * 0.0001)] gps_coordinates.append(gps_coord) io_start = time.time() if frame_detections: detection_frame_count += 1 if detection_frame_count % SAVE_IMAGE_INTERVAL == 0: captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count:06d}.jpg") if cv2.imwrite(captured_frame_path, annotated_frame): if write_geotag(captured_frame_path, gps_coord): detected_issues.append(captured_frame_path) if len(detected_issues) > MAX_IMAGES: os.remove(detected_issues.pop(0)) else: log_entries.append(f"Frame {frame_count}: Geotagging failed") else: log_entries.append(f"Error: Failed to save frame at {captured_frame_path}") write_flight_log(frame_count, gps_coord, timestamp_str) io_times.append((time.time() - io_start) * 1000) out.write(annotated_frame) output_frame_count += 1 last_annotated_frame = annotated_frame if frame_skip > 1: for _ in range(frame_skip - 1): out.write(annotated_frame) output_frame_count += 1 detected_counts.append(len(frame_detections)) all_detections.extend(frame_detections) for detection in frame_detections: log_entries.append(f"Frame {frame_count} at {timestamp_str}: Detected {detection['label']} (Model: {detection['model']}) with confidence {detection['conf']:.2f}") frame_times.append((time.time() - frame_start) * 1000) if len(log_entries) > 50: log_entries.pop(0) if time.time() - start_time > 600: log_entries.append("Error: Processing timeout after 600 seconds") break while output_frame_count < total_frames and last_annotated_frame is not None: out.write(last_annotated_frame) output_frame_count += 1 last_metrics = update_metrics(all_detections) out.release() cap.release() cap = cv2.VideoCapture(output_path) if not cap.isOpened(): log_entries.append("Error: Failed to open output video for verification") output_path = None else: output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) output_fps = cap.get(cv2.CAP_PROP_FPS) output_duration = output_frames / output_fps if output_fps > 0 else 0 cap.release() log_entries.append(f"Output video: {output_frames} frames, {output_fps:.2f} FPS, {output_duration:.2f} seconds") total_time = time.time() - start_time log_entries.append(f"Processing completed in {total_time:.2f} seconds") chart_path = generate_line_chart() map_path = generate_map(gps_coordinates[-5:], all_detections) report_path = generate_report( last_metrics, detected_issues, gps_coordinates, all_detections, frame_count, total_time, output_frames, output_fps, output_duration, detection_frame_count, chart_path, map_path, frame_times, resize_times, inference_times, io_times ) output_zip_path = zip_all_outputs(report_path, output_path, chart_path, map_path) return ( output_path, json.dumps(last_metrics, indent=2), "\n".join(log_entries[-10:]), detected_issues, chart_path, map_path, output_zip_path ) with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange")) as iface: gr.Markdown("# NHAI Road Defect Detection Dashboard") with gr.Row(): with gr.Column(scale=3): video_input = gr.Video(label="Upload Video") model_dropdown = gr.Dropdown( choices=["All"] + list(model_paths.keys()), label="Select YOLO Model(s)", value="All" ) width_slider = gr.Slider(320, 1920, value=1920, label="Output Width", step=1) height_slider = gr.Slider(240, 1080, value=1080, label="Output Height", step=1) skip_slider = gr.Slider(1, 20, value=1, label="Frame Skip", step=1) process_btn = gr.Button("Process Video", variant="primary") with gr.Column(scale=1): metrics_output = gr.Textbox(label="Detection Metrics", lines=5, interactive=False) with gr.Row(): video_output = gr.Video(label="Processed Video") issue_gallery = gr.Gallery(label="Detected Issues", columns=4, height="auto", object_fit="contain") with gr.Row(): chart_output = gr.Image(label="Detection Trend") map_output = gr.Image(label="Issue Locations Map") with gr.Row(): logs_output = gr.Textbox(label="Logs", lines=5, interactive=False) with gr.Row(): gr.Markdown("## Download Results") with gr.Row(): output_zip_download = gr.File(label="Download All Outputs (ZIP)") process_btn.click( fn=process_video, inputs=[video_input, model_dropdown, width_slider, height_slider, skip_slider], outputs=[ video_output, metrics_output, logs_output, issue_gallery, chart_output, map_output, output_zip_download ] ) if __name__ == "__main__": iface.launch()