Spaces:
Sleeping
Sleeping
import gradio as gr | |
import cv2 | |
import time | |
import os | |
import numpy as np | |
from datetime import datetime | |
from collections import Counter | |
from services.video_service import get_next_video_frame | |
from services.crack_detection_service import detect_cracks_and_potholes | |
from services.overlay_service import overlay_boxes | |
from services.metrics_service import update_metrics | |
from services.map_service import generate_map_html | |
from services.utils import simulate_gps_coordinates | |
# State management class | |
class InspectionState: | |
def __init__(self): | |
self.paused = False | |
self.frame_rate = 0.2 | |
self.frame_count = 0 | |
self.log_entries = [] | |
self.crack_counts = [] | |
self.pothole_counts = [] | |
self.severity_all = [] | |
self.last_frame = None | |
self.last_metrics = {} | |
self.last_timestamp = "" | |
self.last_detected_images = [] | |
self.detected_ids = set() | |
self.gps_coordinates = [] | |
self.CAPTURED_FRAMES_DIR = "captured_frames" | |
os.makedirs(self.CAPTURED_FRAMES_DIR, exist_ok=True) | |
state = InspectionState() | |
# Core monitor function | |
def monitor_feed(): | |
if state.paused and state.last_frame is not None: | |
frame = state.last_frame.copy() | |
metrics = state.last_metrics.copy() | |
else: | |
try: | |
frame = get_next_video_frame() | |
if frame is None: | |
state.log_entries.append("Error: Camera feed unavailable. Check video file or camera connection.") | |
return None, state.last_metrics, "\n".join(state.log_entries[-10:]), None, None, state.last_detected_images, None | |
except RuntimeError as e: | |
state.log_entries.append(f"Error: {str(e)}") | |
return None, state.last_metrics, "\n".join(state.log_entries[-10:]), None, None, state.last_detected_images, None | |
detected_items = detect_cracks_and_potholes(frame) | |
frame = overlay_boxes(frame, detected_items) | |
metrics = update_metrics(detected_items) | |
state.frame_count += 1 | |
state.last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
gps_coord = simulate_gps_coordinates(state.frame_count) | |
state.gps_coordinates.append(gps_coord) | |
# Save frames with detections, avoiding duplicates based on ID | |
for item in detected_items: | |
if item['type'] in ['crack', 'pothole']: | |
obj_id = item['id'] | |
if obj_id not in state.detected_ids: | |
captured_frame_path = os.path.join(state.CAPTURED_FRAMES_DIR, f"{item['type']}_ID{obj_id}.jpg") | |
# Ensure the frame saved to disk has the same markings as the live feed | |
marked_frame = frame.copy() | |
marked_frame = overlay_boxes(marked_frame, [item]) | |
cv2.imwrite(captured_frame_path, marked_frame) | |
state.detected_ids.add(obj_id) | |
if captured_frame_path not in state.last_detected_images: | |
state.last_detected_images.append(captured_frame_path) | |
if len(state.last_detected_images) > 100: | |
state.last_detected_images.pop(0) | |
state.last_frame = frame.copy() | |
state.last_metrics = metrics.copy() | |
# Update logs and stats | |
crack_detected = len([item for item in metrics.get('items', []) if item['type'] == 'crack']) | |
pothole_detected = len([item for item in metrics.get('items', []) if item['type'] == 'pothole']) | |
state.severity_all.extend([item['severity'] for item in metrics.get('items', []) if 'severity' in item]) | |
log_entry = f"{state.last_timestamp} - Frame {state.frame_count} - Cracks: {crack_detected} - Potholes: {pothole_detected} - GPS: {gps_coord}" | |
for item in detected_items: | |
log_entry += f"\n {item['type'].capitalize()} ID:{item['id']} - Confidence: {item['confidence']*100:.1f}% - Severity: {item['severity']}" | |
state.log_entries.append(log_entry) | |
state.crack_counts.append(crack_detected) | |
state.pothole_counts.append(pothole_detected) | |
if len(state.log_entries) > 100: | |
state.log_entries.pop(0) | |
if len(state.crack_counts) > 500: | |
state.crack_counts.pop(0) | |
state.pothole_counts.pop(0) | |
if len(state.severity_all) > 500: | |
state.severity_all.pop(0) | |
# Upscale frame for display | |
frame = cv2.resize(state.last_frame, (640, 480), interpolation=cv2.INTER_LINEAR) | |
cv2.putText(frame, f"Frame: {state.frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
cv2.putText(frame, f"{state.last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) | |
# Generate Chart.js chart HTML | |
line_chart_html = """ | |
<canvas id="lineChart" style="max-height: 200px; max-width: 100%;"></canvas> | |
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script> | |
<script> | |
const ctxLine = document.getElementById('lineChart').getContext('2d'); | |
new Chart(ctxLine, { | |
type: 'line', | |
data: { | |
labels: """ + str(list(range(max(0, len(state.crack_counts) - 50), len(state.crack_counts)))) + """, | |
datasets: [ | |
{ label: 'Cracks', data: """ + str(state.crack_counts[-50:]) + """, borderColor: 'red', fill: false }, | |
{ label: 'Potholes', data: """ + str(state.pothole_counts[-50:]) + """, borderColor: 'blue', fill: false } | |
] | |
}, | |
options: { responsive: true, maintainAspectRatio: false } | |
}); | |
</script> | |
""" | |
count = Counter(state.severity_all[-200:]) | |
pie_chart_html = """ | |
<canvas id="pieChart" style="max-height: 200px; max-width: 100%;"></canvas> | |
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script> | |
<script> | |
const ctxPie = document.getElementById('pieChart').getContext('2d'); | |
new Chart(ctxPie, { | |
type: 'pie', | |
data: { | |
labels: """ + str(list(count.keys())) + """, | |
datasets: [{ | |
data: """ + str(list(count.values())) + """, | |
backgroundColor: ['#FF6384', '#36A2EB', '#FFCE56'] | |
}] | |
}, | |
options: { responsive: true, maintainAspectRatio: false } | |
}); | |
</script> | |
""" | |
map_html = generate_map_html(state.gps_coordinates[-5:], [item for item in metrics.get('items', []) if item['type'] in ['crack', 'pothole']]) | |
return frame[:, :, ::-1], metrics, "\n".join(state.log_entries[-10:]), line_chart_html, pie_chart_html, state.last_detected_images, map_html | |
# Gradio UI with beautiful design | |
with gr.Blocks(theme=gr.themes.Soft(), css=""" | |
body { | |
background: linear-gradient(135deg, #1a1a1a, #2b2b2b); | |
color: #ffffff; | |
font-family: 'Poppins', sans-serif; | |
} | |
.header { | |
font-size: 2em; | |
text-align: center; | |
background: linear-gradient(90deg, #ff4d4d, #ff6666); | |
-webkit-background-clip: text; | |
-webkit-text-fill-color: transparent; | |
margin-bottom: 20px; | |
} | |
.gr-button { | |
margin: 5px; | |
background: linear-gradient(90deg, #ff4d4d, #ff6666); | |
color: white; | |
border: none; | |
border-radius: 12px; | |
padding: 12px 24px; | |
transition: transform 0.2s, box-shadow 0.2s; | |
box-shadow: 0 4px 12px rgba(255, 77, 77, 0.3); | |
} | |
.gr-button:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 6px 16px rgba(255, 77, 77, 0.5); | |
} | |
.gr-image { | |
border-radius: 12px; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.4); | |
transition: transform 0.3s; | |
} | |
.gr-image:hover { transform: scale(1.02); } | |
.gr-textbox, .gr-html { | |
background-color: #2b2b2b; | |
border: 1px solid #444; | |
border-radius: 12px; | |
padding: 15px; | |
color: #ffffff; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.3); | |
} | |
#map-output { | |
height: 400px; | |
width: 100%; | |
border: 1px solid #444; | |
border-radius: 12px; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.3); | |
} | |
#chart-output, #pie-output { | |
height: 200px; | |
width: 100%; | |
border: 1px solid #444; | |
border-radius: 12px; | |
background-color: #2b2b2b; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.3); | |
} | |
.legend { | |
background: linear-gradient(135deg, #2b2b2b, #3b3b3b); | |
padding: 15px; | |
border-radius: 12px; | |
margin-bottom: 15px; | |
border: 1px solid #444; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.3); | |
} | |
.legend span { margin-right: 20px; font-size: 16px; font-weight: 500; } | |
""") as app: | |
gr.Markdown("# 🛡️ Drone Road Inspection Dashboard", elem_classes="header") | |
# Legend for markings | |
gr.HTML(""" | |
<div class="legend"> | |
<span style="color: red;">■ Cracks (Red)</span> | |
<span style="color: blue;">■ Potholes (Blue)</span> | |
</div> | |
""") | |
status_text = gr.Markdown("**Status:** 🟢 Running") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
video_output = gr.Image(label="Live Drone Feed", width=640, height=480) | |
with gr.Column(scale=1): | |
metrics_output = gr.Textbox(label="Crack & Pothole Metrics", lines=4) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
logs_output = gr.Textbox(label="Live Logs", lines=8) | |
with gr.Column(scale=1): | |
chart_output = gr.HTML(label="Crack & Pothole Trend", elem_id="chart-output") | |
with gr.Column(scale=1): | |
pie_output = gr.HTML(label="Severity Distribution", elem_id="pie-output") | |
with gr.Row(): | |
map_output = gr.HTML(label="Crack & Pothole Locations Map", elem_id="map-output") | |
captured_images = gr.Gallery(label="Detected Cracks & Potholes (Last 100)", columns=4, rows=25) | |
with gr.Row(): | |
pause_btn = gr.Button("⏸️ Pause") | |
resume_btn = gr.Button("▶️ Resume") | |
frame_slider = gr.Slider(0.0005, 5, value=0.2, label="Frame Interval (seconds)") | |
def toggle_pause(): | |
state.paused = True | |
return "**Status:** ⏸️ Paused" | |
def toggle_resume(): | |
state.paused = False | |
return "**Status:** 🟢 Running" | |
def set_frame_rate(val): | |
state.frame_rate = val | |
pause_btn.click(toggle_pause, outputs=status_text) | |
resume_btn.click(toggle_resume, outputs=status_text) | |
frame_slider.change(set_frame_rate, inputs=[frame_slider]) | |
def streaming_loop(): | |
while True: | |
frame, metrics, logs, line_chart_html, pie_chart_html, captured, map_html = monitor_feed() | |
if frame is None: | |
yield None, str(metrics), logs, line_chart_html, pie_chart_html, captured, map_html | |
else: | |
yield frame, str(metrics), logs, line_chart_html, pie_chart_html, captured, map_html | |
time.sleep(state.frame_rate) | |
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output]) | |
if __name__ == "__main__": | |
app.launch(share=True) |