File size: 10,730 Bytes
447da48
60b383e
 
 
 
 
 
 
 
447da48
a7ec466
447da48
a7ec466
ba6a8ea
935769d
b9d2bb7
 
935769d
b9d2bb7
 
 
935769d
 
447da48
 
 
 
 
 
 
 
b9d2bb7
935769d
ba6a8ea
447da48
a7ec466
8ae9de4
7c9f785
a7ec466
d023885
8ae9de4
 
 
447da48
 
 
 
8ae9de4
a7ec466
e0dcec0
8ae9de4
 
447da48
a7ec466
 
8ae9de4
 
 
d023885
 
 
8ae9de4
 
7c9f785
d023885
 
 
 
 
 
 
 
 
 
 
 
7c9f785
ba6a8ea
d023885
447da48
 
 
a7ec466
3a175cd
447da48
8ae9de4
935769d
60b383e
d023885
447da48
60b383e
 
447da48
8ae9de4
 
60b383e
935769d
60b383e
8ae9de4
935769d
447da48
8ae9de4
447da48
 
8ae9de4
 
447da48
8ae9de4
 
d918bfb
 
 
 
 
6bf90e5
d918bfb
6bf90e5
d918bfb
6bf90e5
d023885
447da48
6bf90e5
 
 
 
 
 
 
 
d918bfb
 
0534bd0
e5303f6
 
d918bfb
 
e5303f6
d918bfb
 
 
 
 
6bf90e5
447da48
 
9501d84
 
447da48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9501d84
 
447da48
 
 
 
 
 
 
 
 
 
9501d84
447da48
 
 
 
 
 
d918bfb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5303f6
447da48
 
 
9501d84
 
b6f77f0
1365143
d918bfb
 
f131341
 
5101617
8977de8
447da48
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# app.py
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
import cv2 # Retained for detector compatibility

# Assuming the factory and processor are in the src directory
from src.detection.factory import get_detector

# ───────────────────────────── logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s β”‚ %(message)s",
    datefmt="%H:%M:%S",
)

# ───────────────────────────── config / detector
load_dotenv()
try:
    with open("config.yaml") as f:
        CFG = yaml.safe_load(f)
except FileNotFoundError:
    logging.error("FATAL: config.yaml not found. Please ensure the file exists.")
    # Create a dummy CFG to prevent crashing the app on load
    CFG = {"alerting": {}, "geometric_settings": {}}


detector = get_detector(CFG)

# ───────────────────────────── Alert Manager Class (Unchanged)
class AlertManager:
    def __init__(self, config):
        self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
        self.last_alert_time = 0
        # --- FIX: The is_alert_active flag is no longer needed ---
        self._load_sound(config.get("alert_sound_path"))

    def _load_sound(self, wav_path):
        if not wav_path or not os.path.exists(wav_path):
            logging.warning(f"Alert sound not found at '{wav_path}'. Alerts will be silent.")
            self.alert_data = None
            self.sample_rate = None
            return
        try:
            data, sr = sf.read(wav_path, dtype="int16")
            self.alert_data = data
            self.sample_rate = sr
            logging.info(f"Loaded alert sound: {wav_path}")
        except Exception as e:
            logging.error(f"Failed to load alert sound: {e}")
            self.alert_data = None

    def trigger_alert(self, level, lighting):
        """
        Triggers a repeating alert if drowsiness persists, governed by a cooldown.
        """
        is_drowsy = level != "Awake"
        is_good_light = lighting != "Low"

        # --- FIX: New simplified logic for repeating alerts ---
        # Check for drowsiness conditions first.
        if is_drowsy and is_good_light and self.alert_data is not None:
            # Then, check if the cooldown period has passed since the last alert.
            cooldown_passed = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds
            if cooldown_passed:
                # If conditions are met, fire the alert and reset the timer.
                self.last_alert_time = time.monotonic()
                logging.info(f"πŸ”Š Drowsiness alert! Repeating every {self.cooldown_seconds}s until 'Awake'.")
                return (self.sample_rate, self.alert_data.copy())

        # If conditions are not met (e.g., user is Awake or cooldown hasn't passed), do nothing.
        return None

        
alert_manager = AlertManager(CFG.get("alerting", {}))

# ───────────────────────────── Frame Processing for Tab 1 (Image Stream) - UPDATED
def process_live_frame(frame):
    if frame is None:
        return (np.zeros((480, 640, 3), dtype=np.uint8), "Status: Inactive", None)

    t0 = time.perf_counter()
    try:
        # Call with draw_visuals=True
        processed, indic = detector.process_frame(frame, draw_visuals=True)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        processed = np.zeros_like(frame) if frame is not None else np.zeros((480, 640, 3), dtype=np.uint8)
        indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}

    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)

    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"IMAGE STREAM β”‚ {dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")

    status_txt = f"Lighting: {lighting}\n" + \
                 ("Detection paused – low light." if lighting == "Low" else f"Status: {level}\nScore: {score:.2f}")

    audio_payload = alert_manager.trigger_alert(level, lighting)
    return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None


def process_for_stats_only(frame):
    """
    Processes a frame but does not return any video/image output.
    This is the fastest method, focused only on status and alerts.
    """
    if frame is None:
        return "Status: Inactive", None

    t0 = time.perf_counter()
    try:
        # Call with draw_visuals=False. The first returned value will be None.
        _, indic = detector.process_frame(frame, draw_visuals=False)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}

    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)

    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"ANALYSIS ONLY β”‚ {dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")

    status_txt = (
        f"Status: {level} (Score: {score:.2f})\n"
        f"Lighting: {lighting}\n"
        f"Processing Time: {dt_ms:.1f} ms"
    )

    audio_payload = alert_manager.trigger_alert(level, lighting)
    audio_out = gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None

    return status_txt, audio_out


# ───────────────────────────── UI Definition (Unchanged)
def create_readme_tab():
    """Creates the content for the 'About' tab."""
    gr.Markdown(
        """
        <div align="center">
          <img src="https://em-content.zobj.net/source/samsung/380/automobile_1f697.png" alt="Car Emoji" width="100"/>
          <h1>Drive Paddy (Gradio Edition)</h1>
          <p><strong>Your AI-Powered Drowsiness Detection Assistant</strong></p>
        </div>

        ---

        ## 🌟 Features
        - **Real-Time Webcam Streaming**: Directly processes your live camera feed for immediate feedback.
        - **Efficient Geometric Analysis**: Uses `MediaPipe` for high-performance facial landmark detection.
        - **Multi-Signal Analysis**: Detects eye closure (EAR), yawns (MAR), and head-nodding.
        - **Stateful Alert System**: Plays a clear audio alert for new drowsiness events and intelligently re-arms itself, preventing alert fatigue.
        - **Low-Light Warning**: Automatically detects and warns about poor lighting conditions.
        - **Configurable**: Key detection thresholds and settings can be tuned via `config.yaml`.

        ---

        ## πŸ› οΈ How It Works
        1.  **Video Streaming**: The `gradio.Image` component captures the camera feed.
        2.  **Frame Processing**: Each frame is sent to the `GeometricProcessor`.
        3.  **Stateful Alerting**: The `AlertManager` class uses internal state to decide if a *new* alert should be triggered.
        4.  **Dynamic Updates**: The processed video, status text, and audio alerts are sent back to the frontend for a seamless real-time experience.

        ---

        ## πŸ’‘ Understanding the Live Status
        The status panel provides real-time feedback on the following parameters:

        -   **`Lighting`**: Indicates the ambient light conditions.
            -   `Good`: Sufficient light for reliable detection.
            -   `Low`: Insufficient light. Detection is paused as the results would be unreliable.

        -   **`Status`**: The overall assessed level of driver alertness.
            -   `Awake`: The driver appears alert.
            -   `Slightly Drowsy`: Early signs of fatigue have been detected.
            -   `Very Drowsy`: Strong indicators of drowsiness are present. An alert is triggered.

        -   **`Score`**: A numerical value representing the accumulated evidence of drowsiness based on the weighted indicators (eye closure, yawning, head pose). A higher score corresponds to a greater level of detected drowsiness.
        """
    )

def create_detection_tab():
    """Creates the content for the 'Live Detection' tab (Image Stream)."""
    gr.Markdown("## πŸ“Ή Live Drowsiness Detection (Image Stream)")
    gr.Markdown("This feed provides the lowest latency by streaming processed images directly.")
    with gr.Row():
        with gr.Column(scale=2):
            cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
        with gr.Column(scale=1):
            out_img = gr.Image(label="Processed Feed")
            out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
            out_audio = gr.Audio(label="Alert", autoplay=True, visible=False)
    
    cam.stream(
        fn=process_live_frame,
        inputs=[cam],
        outputs=[out_img, out_text, out_audio]
    )

def create_analysis_only_tab():
    """Creates the content for the Analysis-Only Mode tab."""
    gr.Markdown("## ⚑ Analysis-Only Mode")
    gr.Markdown("This mode provides the fastest possible analysis by not sending any video back to the browser. The camera is still active for detection, but you will only see the live status and hear alerts.")
    with gr.Row():
        with gr.Column(scale=1):
            # The input camera is visible so the user knows it's working,
            # but there is no corresponding video output component.
            cam_analysis = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed (for detection)")
        with gr.Column(scale=1):
            out_text_analysis = gr.Textbox(label="Live Status & Performance", lines=4, interactive=False)
            out_audio_analysis = gr.Audio(label="Alert", autoplay=True, visible=False)
    
    cam_analysis.stream(
        fn=process_for_stats_only,
        inputs=[cam_analysis],
        outputs=[out_text_analysis, out_audio_analysis]
    )


# --- Main App Interface with Tabs ---
with gr.Blocks(title="Drive Paddy – Drowsiness Detection", theme=gr.themes.Soft()) as app:
    gr.Markdown("# πŸš— **Drive Paddy**")
    with gr.Tabs():
        with gr.TabItem("Live Detection"):
            create_detection_tab()
        with gr.TabItem("Analysis-Only Mode"):
            create_analysis_only_tab()
        with gr.TabItem("About this App"):
            create_readme_tab()

if __name__ == "__main__":
    logging.info("Launching Gradio app...")
    app.launch(debug=True)