import gradio as gr import cv2 import time import os import numpy as np from datetime import datetime from collections import Counter from services.video_service import get_next_video_frame from services.crack_detection_service import detect_cracks_and_potholes from services.overlay_service import overlay_boxes from services.metrics_service import update_metrics from services.map_service import generate_map_html from services.utils import simulate_gps_coordinates # State management class class InspectionState: def __init__(self): self.paused = False self.frame_rate = 0.2 self.frame_count = 0 self.log_entries = [] self.crack_counts = [] self.pothole_counts = [] self.severity_all = [] self.last_frame = None self.last_metrics = {} self.last_timestamp = "" self.last_detected_images = [] self.detected_ids = set() self.gps_coordinates = [] self.CAPTURED_FRAMES_DIR = "captured_frames" os.makedirs(self.CAPTURED_FRAMES_DIR, exist_ok=True) state = InspectionState() # Core monitor function def monitor_feed(): if state.paused and state.last_frame is not None: frame = state.last_frame.copy() metrics = state.last_metrics.copy() else: try: frame = get_next_video_frame() if frame is None: state.log_entries.append("Error: Camera feed unavailable. Check video file or camera connection.") return None, state.last_metrics, "\n".join(state.log_entries[-10:]), None, None, state.last_detected_images, None except RuntimeError as e: state.log_entries.append(f"Error: {str(e)}") return None, state.last_metrics, "\n".join(state.log_entries[-10:]), None, None, state.last_detected_images, None detected_items = detect_cracks_and_potholes(frame) frame = overlay_boxes(frame, detected_items) metrics = update_metrics(detected_items) state.frame_count += 1 state.last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") gps_coord = simulate_gps_coordinates(state.frame_count) state.gps_coordinates.append(gps_coord) # Save frames with detections, avoiding duplicates based on ID for item in detected_items: if item['type'] in ['crack', 'pothole']: obj_id = item['id'] if obj_id not in state.detected_ids: captured_frame_path = os.path.join(state.CAPTURED_FRAMES_DIR, f"{item['type']}_ID{obj_id}.jpg") # Ensure the frame saved to disk has the same markings as the live feed marked_frame = frame.copy() marked_frame = overlay_boxes(marked_frame, [item]) cv2.imwrite(captured_frame_path, marked_frame) state.detected_ids.add(obj_id) if captured_frame_path not in state.last_detected_images: state.last_detected_images.append(captured_frame_path) if len(state.last_detected_images) > 100: state.last_detected_images.pop(0) state.last_frame = frame.copy() state.last_metrics = metrics.copy() # Update logs and stats crack_detected = len([item for item in metrics.get('items', []) if item['type'] == 'crack']) pothole_detected = len([item for item in metrics.get('items', []) if item['type'] == 'pothole']) state.severity_all.extend([item['severity'] for item in metrics.get('items', []) if 'severity' in item]) log_entry = f"{state.last_timestamp} - Frame {state.frame_count} - Cracks: {crack_detected} - Potholes: {pothole_detected} - GPS: {gps_coord}" for item in detected_items: log_entry += f"\n {item['type'].capitalize()} ID:{item['id']} - Confidence: {item['confidence']*100:.1f}% - Severity: {item['severity']}" state.log_entries.append(log_entry) state.crack_counts.append(crack_detected) state.pothole_counts.append(pothole_detected) if len(state.log_entries) > 100: state.log_entries.pop(0) if len(state.crack_counts) > 500: state.crack_counts.pop(0) state.pothole_counts.pop(0) if len(state.severity_all) > 500: state.severity_all.pop(0) # Upscale frame for display frame = cv2.resize(state.last_frame, (640, 480), interpolation=cv2.INTER_LINEAR) cv2.putText(frame, f"Frame: {state.frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{state.last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) # Generate Chart.js chart HTML line_chart_html = """ """ count = Counter(state.severity_all[-200:]) pie_chart_html = """ """ map_html = generate_map_html(state.gps_coordinates[-5:], [item for item in metrics.get('items', []) if item['type'] in ['crack', 'pothole']]) return frame[:, :, ::-1], metrics, "\n".join(state.log_entries[-10:]), line_chart_html, pie_chart_html, state.last_detected_images, map_html # Gradio UI with beautiful design with gr.Blocks(theme=gr.themes.Soft(), css=""" body { background: linear-gradient(135deg, #1a1a1a, #2b2b2b); color: #ffffff; font-family: 'Poppins', sans-serif; } .header { font-size: 2em; text-align: center; background: linear-gradient(90deg, #ff4d4d, #ff6666); -webkit-background-clip: text; -webkit-text-fill-color: transparent; margin-bottom: 20px; } .gr-button { margin: 5px; background: linear-gradient(90deg, #ff4d4d, #ff6666); color: white; border: none; border-radius: 12px; padding: 12px 24px; transition: transform 0.2s, box-shadow 0.2s; box-shadow: 0 4px 12px rgba(255, 77, 77, 0.3); } .gr-button:hover { transform: translateY(-2px); box-shadow: 0 6px 16px rgba(255, 77, 77, 0.5); } .gr-image { border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.4); transition: transform 0.3s; } .gr-image:hover { transform: scale(1.02); } .gr-textbox, .gr-html { background-color: #2b2b2b; border: 1px solid #444; border-radius: 12px; padding: 15px; color: #ffffff; box-shadow: 0 4px 12px rgba(0,0,0,0.3); } #map-output { height: 400px; width: 100%; border: 1px solid #444; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.3); } #chart-output, #pie-output { height: 200px; width: 100%; border: 1px solid #444; border-radius: 12px; background-color: #2b2b2b; box-shadow: 0 4px 12px rgba(0,0,0,0.3); } .legend { background: linear-gradient(135deg, #2b2b2b, #3b3b3b); padding: 15px; border-radius: 12px; margin-bottom: 15px; border: 1px solid #444; box-shadow: 0 4px 12px rgba(0,0,0,0.3); } .legend span { margin-right: 20px; font-size: 16px; font-weight: 500; } """) as app: gr.Markdown("# 🛡️ Drone Road Inspection Dashboard", elem_classes="header") # Legend for markings gr.HTML("""
■ Cracks (Red) ■ Potholes (Blue)
""") status_text = gr.Markdown("**Status:** 🟢 Running") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480) with gr.Column(scale=1): metrics_output = gr.Textbox(label="Crack & Pothole Metrics", lines=4) with gr.Row(): with gr.Column(scale=2): logs_output = gr.Textbox(label="Live Logs", lines=8) with gr.Column(scale=1): chart_output = gr.HTML(label="Crack & Pothole Trend", elem_id="chart-output") with gr.Column(scale=1): pie_output = gr.HTML(label="Severity Distribution", elem_id="pie-output") with gr.Row(): map_output = gr.HTML(label="Crack & Pothole Locations Map", elem_id="map-output") captured_images = gr.Gallery(label="Detected Cracks & Potholes (Last 100)", columns=4, rows=25) with gr.Row(): pause_btn = gr.Button("⏸️ Pause") resume_btn = gr.Button("▶️ Resume") frame_slider = gr.Slider(0.0005, 5, value=0.2, label="Frame Interval (seconds)") def toggle_pause(): state.paused = True return "**Status:** ⏸️ Paused" def toggle_resume(): state.paused = False return "**Status:** 🟢 Running" def set_frame_rate(val): state.frame_rate = val pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: frame, metrics, logs, line_chart_html, pie_chart_html, captured, map_html = monitor_feed() if frame is None: yield None, str(metrics), logs, line_chart_html, pie_chart_html, captured, map_html else: yield frame, str(metrics), logs, line_chart_html, pie_chart_html, captured, map_html time.sleep(state.frame_rate) app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output]) if __name__ == "__main__": app.launch(share=True)