Spaces:
Sleeping
Sleeping
File size: 10,730 Bytes
447da48 60b383e 447da48 a7ec466 447da48 a7ec466 ba6a8ea 935769d b9d2bb7 935769d b9d2bb7 935769d 447da48 b9d2bb7 935769d ba6a8ea 447da48 a7ec466 8ae9de4 7c9f785 a7ec466 d023885 8ae9de4 447da48 8ae9de4 a7ec466 e0dcec0 8ae9de4 447da48 a7ec466 8ae9de4 d023885 8ae9de4 7c9f785 d023885 7c9f785 ba6a8ea d023885 447da48 a7ec466 3a175cd 447da48 8ae9de4 935769d 60b383e d023885 447da48 60b383e 447da48 8ae9de4 60b383e 935769d 60b383e 8ae9de4 935769d 447da48 8ae9de4 447da48 8ae9de4 447da48 8ae9de4 d918bfb 6bf90e5 d918bfb 6bf90e5 d918bfb 6bf90e5 d023885 447da48 6bf90e5 d918bfb 0534bd0 e5303f6 d918bfb e5303f6 d918bfb 6bf90e5 447da48 9501d84 447da48 9501d84 447da48 9501d84 447da48 d918bfb e5303f6 447da48 9501d84 b6f77f0 1365143 d918bfb f131341 5101617 8977de8 447da48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# app.py
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
import cv2 # Retained for detector compatibility
# Assuming the factory and processor are in the src directory
from src.detection.factory import get_detector
# βββββββββββββββββββββββββββββ logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β %(message)s",
datefmt="%H:%M:%S",
)
# βββββββββββββββββββββββββββββ config / detector
load_dotenv()
try:
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
except FileNotFoundError:
logging.error("FATAL: config.yaml not found. Please ensure the file exists.")
# Create a dummy CFG to prevent crashing the app on load
CFG = {"alerting": {}, "geometric_settings": {}}
detector = get_detector(CFG)
# βββββββββββββββββββββββββββββ Alert Manager Class (Unchanged)
class AlertManager:
def __init__(self, config):
self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
self.last_alert_time = 0
# --- FIX: The is_alert_active flag is no longer needed ---
self._load_sound(config.get("alert_sound_path"))
def _load_sound(self, wav_path):
if not wav_path or not os.path.exists(wav_path):
logging.warning(f"Alert sound not found at '{wav_path}'. Alerts will be silent.")
self.alert_data = None
self.sample_rate = None
return
try:
data, sr = sf.read(wav_path, dtype="int16")
self.alert_data = data
self.sample_rate = sr
logging.info(f"Loaded alert sound: {wav_path}")
except Exception as e:
logging.error(f"Failed to load alert sound: {e}")
self.alert_data = None
def trigger_alert(self, level, lighting):
"""
Triggers a repeating alert if drowsiness persists, governed by a cooldown.
"""
is_drowsy = level != "Awake"
is_good_light = lighting != "Low"
# --- FIX: New simplified logic for repeating alerts ---
# Check for drowsiness conditions first.
if is_drowsy and is_good_light and self.alert_data is not None:
# Then, check if the cooldown period has passed since the last alert.
cooldown_passed = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds
if cooldown_passed:
# If conditions are met, fire the alert and reset the timer.
self.last_alert_time = time.monotonic()
logging.info(f"π Drowsiness alert! Repeating every {self.cooldown_seconds}s until 'Awake'.")
return (self.sample_rate, self.alert_data.copy())
# If conditions are not met (e.g., user is Awake or cooldown hasn't passed), do nothing.
return None
alert_manager = AlertManager(CFG.get("alerting", {}))
# βββββββββββββββββββββββββββββ Frame Processing for Tab 1 (Image Stream) - UPDATED
def process_live_frame(frame):
if frame is None:
return (np.zeros((480, 640, 3), dtype=np.uint8), "Status: Inactive", None)
t0 = time.perf_counter()
try:
# Call with draw_visuals=True
processed, indic = detector.process_frame(frame, draw_visuals=True)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame) if frame is not None else np.zeros((480, 640, 3), dtype=np.uint8)
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"IMAGE STREAM β {dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}")
status_txt = f"Lighting: {lighting}\n" + \
("Detection paused β low light." if lighting == "Low" else f"Status: {level}\nScore: {score:.2f}")
audio_payload = alert_manager.trigger_alert(level, lighting)
return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None
def process_for_stats_only(frame):
"""
Processes a frame but does not return any video/image output.
This is the fastest method, focused only on status and alerts.
"""
if frame is None:
return "Status: Inactive", None
t0 = time.perf_counter()
try:
# Call with draw_visuals=False. The first returned value will be None.
_, indic = detector.process_frame(frame, draw_visuals=False)
except Exception as e:
logging.error(f"Error processing frame: {e}")
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"ANALYSIS ONLY β {dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}")
status_txt = (
f"Status: {level} (Score: {score:.2f})\n"
f"Lighting: {lighting}\n"
f"Processing Time: {dt_ms:.1f} ms"
)
audio_payload = alert_manager.trigger_alert(level, lighting)
audio_out = gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None
return status_txt, audio_out
# βββββββββββββββββββββββββββββ UI Definition (Unchanged)
def create_readme_tab():
"""Creates the content for the 'About' tab."""
gr.Markdown(
"""
<div align="center">
<img src="https://em-content.zobj.net/source/samsung/380/automobile_1f697.png" alt="Car Emoji" width="100"/>
<h1>Drive Paddy (Gradio Edition)</h1>
<p><strong>Your AI-Powered Drowsiness Detection Assistant</strong></p>
</div>
---
## π Features
- **Real-Time Webcam Streaming**: Directly processes your live camera feed for immediate feedback.
- **Efficient Geometric Analysis**: Uses `MediaPipe` for high-performance facial landmark detection.
- **Multi-Signal Analysis**: Detects eye closure (EAR), yawns (MAR), and head-nodding.
- **Stateful Alert System**: Plays a clear audio alert for new drowsiness events and intelligently re-arms itself, preventing alert fatigue.
- **Low-Light Warning**: Automatically detects and warns about poor lighting conditions.
- **Configurable**: Key detection thresholds and settings can be tuned via `config.yaml`.
---
## π οΈ How It Works
1. **Video Streaming**: The `gradio.Image` component captures the camera feed.
2. **Frame Processing**: Each frame is sent to the `GeometricProcessor`.
3. **Stateful Alerting**: The `AlertManager` class uses internal state to decide if a *new* alert should be triggered.
4. **Dynamic Updates**: The processed video, status text, and audio alerts are sent back to the frontend for a seamless real-time experience.
---
## π‘ Understanding the Live Status
The status panel provides real-time feedback on the following parameters:
- **`Lighting`**: Indicates the ambient light conditions.
- `Good`: Sufficient light for reliable detection.
- `Low`: Insufficient light. Detection is paused as the results would be unreliable.
- **`Status`**: The overall assessed level of driver alertness.
- `Awake`: The driver appears alert.
- `Slightly Drowsy`: Early signs of fatigue have been detected.
- `Very Drowsy`: Strong indicators of drowsiness are present. An alert is triggered.
- **`Score`**: A numerical value representing the accumulated evidence of drowsiness based on the weighted indicators (eye closure, yawning, head pose). A higher score corresponds to a greater level of detected drowsiness.
"""
)
def create_detection_tab():
"""Creates the content for the 'Live Detection' tab (Image Stream)."""
gr.Markdown("## πΉ Live Drowsiness Detection (Image Stream)")
gr.Markdown("This feed provides the lowest latency by streaming processed images directly.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
out_audio = gr.Audio(label="Alert", autoplay=True, visible=False)
cam.stream(
fn=process_live_frame,
inputs=[cam],
outputs=[out_img, out_text, out_audio]
)
def create_analysis_only_tab():
"""Creates the content for the Analysis-Only Mode tab."""
gr.Markdown("## β‘ Analysis-Only Mode")
gr.Markdown("This mode provides the fastest possible analysis by not sending any video back to the browser. The camera is still active for detection, but you will only see the live status and hear alerts.")
with gr.Row():
with gr.Column(scale=1):
# The input camera is visible so the user knows it's working,
# but there is no corresponding video output component.
cam_analysis = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed (for detection)")
with gr.Column(scale=1):
out_text_analysis = gr.Textbox(label="Live Status & Performance", lines=4, interactive=False)
out_audio_analysis = gr.Audio(label="Alert", autoplay=True, visible=False)
cam_analysis.stream(
fn=process_for_stats_only,
inputs=[cam_analysis],
outputs=[out_text_analysis, out_audio_analysis]
)
# --- Main App Interface with Tabs ---
with gr.Blocks(title="Drive Paddy β Drowsiness Detection", theme=gr.themes.Soft()) as app:
gr.Markdown("# π **Drive Paddy**")
with gr.Tabs():
with gr.TabItem("Live Detection"):
create_detection_tab()
with gr.TabItem("Analysis-Only Mode"):
create_analysis_only_tab()
with gr.TabItem("About this App"):
create_readme_tab()
if __name__ == "__main__":
logging.info("Launching Gradio app...")
app.launch(debug=True)
|