Spaces:
Sleeping
Sleeping
File size: 7,685 Bytes
a97f941 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
#!/usr/bin/env python3
"""
SafetyMaster Pro - Webcam Streaming Version
Optimized for cloud deployment with client-side webcam capture
"""
import cv2
import base64
import json
import time
import os
import numpy as np
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import threading
from datetime import datetime
from io import BytesIO
from PIL import Image
from safety_detector import SafetyDetector
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'safety_monitor_secret_key')
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# Global variables
detector = None
violation_log = []
def initialize_components():
"""Initialize the safety detector."""
global detector
try:
detector = SafetyDetector()
print("Safety detector initialized successfully")
return True
except Exception as e:
print(f"Error initializing components: {e}")
return False
def base64_to_frame(base64_string):
"""Convert base64 string to OpenCV frame."""
try:
# Decode base64 to bytes
image_bytes = base64.b64decode(base64_string)
# Convert to PIL Image
pil_image = Image.open(BytesIO(image_bytes))
# Convert to OpenCV format (BGR)
frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return frame
except Exception as e:
print(f"Error converting base64 to frame: {e}")
return None
def frame_to_base64(frame):
"""Convert OpenCV frame to base64 string."""
try:
# Encode frame as JPEG
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Convert to base64
frame_base64 = base64.b64encode(buffer).decode('utf-8')
return frame_base64
except Exception as e:
print(f"Error converting frame to base64: {e}")
return None
@app.route('/')
def dashboard():
"""Serve the webcam-enabled dashboard."""
return render_template('dashboard_webcam.html')
@app.route('/health')
def health_check():
"""Health check endpoint."""
return jsonify({
'status': 'healthy',
'service': 'SafetyMaster Pro (Webcam)',
'timestamp': datetime.now().isoformat(),
'detector_loaded': detector is not None
})
@app.route('/api/violations')
def get_violations():
"""Get recent violations."""
try:
return jsonify({
'success': True,
'violations': violation_log[-20:], # Last 20 violations
'total_count': len(violation_log)
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Error getting violations: {str(e)}'
}), 500
@app.route('/api/model_info')
def get_model_info():
"""Get information about the loaded model."""
try:
if detector:
return jsonify({
'success': True,
'model_classes': detector.get_model_classes(),
'device': detector.device
})
else:
return jsonify({
'success': False,
'message': 'Detector not initialized'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'Error getting model info: {str(e)}'
}), 500
@socketio.on('connect')
def handle_connect():
"""Handle client connection."""
print('Client connected')
emit('status', {'message': 'Connected to Safety Monitor'})
@socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection."""
print('Client disconnected')
@socketio.on('process_frame')
def handle_process_frame(data):
"""Process a frame sent from the client webcam."""
try:
if not detector:
emit('error', {'message': 'Detector not initialized'})
return
# Get frame data
frame_data = data.get('frame')
if not frame_data:
emit('error', {'message': 'No frame data received'})
return
# Convert base64 to OpenCV frame
frame = base64_to_frame(frame_data)
if frame is None:
emit('error', {'message': 'Failed to decode frame'})
return
# Run AI detection
results = detector.detect_safety_violations(frame)
# Draw detections on frame
annotated_frame = detector.draw_detections(frame, results)
# Convert back to base64
processed_frame_base64 = frame_to_base64(annotated_frame)
# Log violations
if results['violations']:
current_time = datetime.now().isoformat()
for violation in results['violations']:
violation_entry = {
'timestamp': current_time,
'type': violation['type'],
'description': violation['description'],
'severity': violation.get('severity', 'high'),
'count': violation.get('count', 1)
}
violation_log.append(violation_entry)
# Keep only last 50 violations
if len(violation_log) > 50:
violation_log.pop(0)
# Send results back to client
response_data = {
'people_count': results['people_count'],
'safety_equipment': results['safety_equipment'],
'violations': results['violations'],
'fps': results['fps'],
'timestamp': datetime.now().isoformat(),
'processed_frame': processed_frame_base64
}
emit('detection_result', response_data)
except Exception as e:
print(f"Error processing frame: {e}")
emit('error', {'message': f'Error processing frame: {str(e)}'})
@socketio.on('request_model_info')
def handle_model_info_request():
"""Send model information to client."""
try:
if detector:
model_info = {
'classes': detector.get_model_classes(),
'device': detector.device
}
emit('model_info', model_info)
else:
emit('error', {'message': 'Detector not initialized'})
except Exception as e:
emit('error', {'message': f'Error getting model info: {str(e)}'})
def main():
"""Main function to run the web application."""
print("🤖 Loading AI model (this may take a moment on first run)...")
print(" Using pre-uploaded models for faster startup...")
if not initialize_components():
print("❌ Failed to initialize components")
return
# Get port from environment variable (Hugging Face Spaces uses 7860)
port = int(os.environ.get('PORT', 7860))
host = '0.0.0.0' # Required for cloud deployment
print("🚀 Starting SafetyMaster Pro (Webcam Version)...")
print(f" Running on: http://{host}:{port}")
print(" 🎥 Webcam streaming enabled for cloud deployment")
print(" Press Ctrl+C to stop")
try:
socketio.run(app,
host=host,
port=port,
debug=False, # Disable debug in production
use_reloader=False, # Disable reloader in production
allow_unsafe_werkzeug=True)
except KeyboardInterrupt:
print("\n🛑 Shutting down Safety Monitor...")
print(" Safety Monitor stopped")
if __name__ == '__main__':
main() |