import os import cv2 import streamlit as st import numpy as np from PIL import Image, ImageDraw, ImageFont import tensorflow as tf from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, WebRtcMode import av import time from typing import Tuple, List, Dict, Any import h5py from huggingface_hub import hf_hub_download import urllib.request import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set environment variables for headless operation os.environ["DISPLAY"] = ":0" os.environ["QT_QPA_PLATFORM"] = "offscreen" os.environ["OPENCV_HEADLESS"] = "1" # Set TensorFlow to use CPU only try: tf.config.set_visible_devices([], 'GPU') logger.info("TensorFlow configured to use CPU only") except Exception as e: logger.error(f"Error configuring TensorFlow for CPU: {e}") # Set page config with transparent background st.set_page_config( page_title="Face Mask Detection", page_icon="😷", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS for modern styling with transparent background st.markdown(""" """, unsafe_allow_html=True) # Global variables for model and processor model = None face_cascade = None dnn_net = None model_input_size = (128, 128) # From model config class_names = ['Mask', 'No Mask'] # From model config model_loaded = False face_detector_loaded = False use_dnn_detector = False def download_haarcascade(): """Download the Haar cascade file if it doesn't exist.""" cascade_file = "haarcascade_frontalface_default.xml" cascade_url = "https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml" if not os.path.exists(cascade_file): try: with st.spinner("Downloading Haar cascade file..."): urllib.request.urlretrieve(cascade_url, cascade_file) st.success("Haar cascade file downloaded successfully!") return True except Exception as e: st.error(f"Failed to download Haar cascade file: {str(e)}") return False return True def download_dnn_model(): """Download the DNN face detection model files.""" model_files = { "deploy.prototxt": "https://raw.githubusercontent.com/opencv/opencv/master/samples/dnn/face_detector/deploy.prototxt", "res10_300x300_ssd_iter_140000.caffemodel": "https://raw.githubusercontent.com/opencv/opencv_3rdparty/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel" } all_downloaded = True for filename, url in model_files.items(): if not os.path.exists(filename): try: with st.spinner(f"Downloading {filename}..."): urllib.request.urlretrieve(url, filename) st.success(f"{filename} downloaded successfully!") except Exception as e: st.error(f"Failed to download {filename}: {str(e)}") all_downloaded = False return all_downloaded def load_model() -> Any: """Load the Keras face mask detection model from Hugging Face with enhanced error handling.""" global model, model_loaded if model is None: model_filename = "mask_detection_model.h5" repo_id = "sreenathsree1578/face_mask_detection" try: # Download model from Hugging Face Hub with st.spinner("Downloading model from Hugging Face Hub..."): model_path = hf_hub_download(repo_id=repo_id, filename=model_filename) # Try loading with different methods # Method 1: Standard Keras load try: logger.info("Attempting to load model with standard method") model = tf.keras.models.load_model(model_path) model_loaded = True logger.info("Model loaded successfully with standard method") return model except Exception as e1: logger.error(f"Standard model loading failed: {e1}") # Method 2: Try with custom objects try: logger.info("Attempting to load model with compile=False") model = tf.keras.models.load_model(model_path, compile=False) model_loaded = True logger.info("Model loaded successfully with compile=False") return model except Exception as e2: logger.error(f"Model loading with compile=False failed: {e2}") # Method 3: Try loading as SavedModel if it's actually a directory try: logger.info("Attempting to load model as SavedModel") if os.path.isdir(model_path): model = tf.keras.models.load_model(model_path) model_loaded = True logger.info("Model loaded successfully as SavedModel") return model except Exception as e3: logger.error(f"SavedModel loading failed: {e3}") # If all methods failed model_loaded = False return None except Exception as e: logger.error(f"Error loading model from Hugging Face: {e}") st.error(f"Error loading model from Hugging Face: {str(e)}") model_loaded = False return None return model def load_face_detector(): """Load OpenCV's Haar cascade or DNN face detector.""" global face_cascade, dnn_net, face_detector_loaded, use_dnn_detector # First try to load Haar cascade if not use_dnn_detector: # Download the Haar cascade file if needed if not download_haarcascade(): logger.info("Haar cascade download failed, trying DNN detector") use_dnn_detector = True return load_face_detector() try: # Load the pre-trained Haar cascade classifier from local file face_cascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml") # Check if the cascade was loaded successfully if face_cascade.empty(): logger.info("Haar cascade is empty, trying DNN detector") use_dnn_detector = True return load_face_detector() face_detector_loaded = True use_dnn_detector = False return True except Exception as e: logger.error(f"Error loading Haar cascade: {e}") use_dnn_detector = True return load_face_detector() # If we're here, we need to use the DNN detector if not download_dnn_model(): face_detector_loaded = False return False try: # Load the DNN model dnn_net = cv2.dnn.readNetFromCaffe("deploy.prototxt", "res10_300x300_ssd_iter_140000.caffemodel") face_detector_loaded = True use_dnn_detector = True return True except Exception as e: logger.error(f"Error loading DNN model: {e}") face_detector_loaded = False return False def detect_faces_haar(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces using Haar cascade.""" # Convert to grayscale for face detection gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces faces = face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE ) # Convert to list of tuples (x, y, w, h) return [(x, y, w, h) for (x, y, w, h) in faces] def detect_faces_dnn(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces using DNN model.""" # Get image dimensions (h, w) = image.shape[:2] # Create a blob from the image blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) # Pass the blob through the network and get the detections dnn_net.setInput(blob) detections = dnn_net.forward() faces = [] # Loop over the detections for i in range(0, detections.shape[2]): # Extract the confidence (i.e., probability) associated with the prediction confidence = detections[0, 0, i, 2] # Filter out weak detections by ensuring the confidence is greater than a minimum threshold if confidence > 0.5: # Compute the (x, y)-coordinates of the bounding box for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # Add to faces list faces.append((startX, startY, endX - startX, endY - startY)) return faces def detect_faces(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces in the image using Haar cascade or DNN method.""" if use_dnn_detector: return detect_faces_dnn(image) else: return detect_faces_haar(image) def preprocess_image(image: np.ndarray) -> np.ndarray: """Preprocess image for model inference.""" # Resize to model input size resized = cv2.resize(image, model_input_size) # Normalize to [0,1] normalized = resized.astype(np.float32) / 255.0 # Add batch dimension return np.expand_dims(normalized, axis=0) def classify_faces(image: np.ndarray, faces: List[Tuple[int, int, int, int]], confidence_threshold: float = 0.5) -> List[Dict]: """Classify each detected face as mask or no mask.""" detections = [] if not model_loaded: # If model is not loaded, return empty detections return detections for (x, y, w, h) in faces: # Extract face ROI face_roi = image[y:y+h, x:x+w] # Skip if face ROI is empty if face_roi.size == 0: continue # Preprocess the face ROI processed_face = preprocess_image(face_roi) # Classify the face try: predictions = model.predict(processed_face, verbose=0) # Get the class with the highest probability class_id = np.argmax(predictions[0]) confidence = float(predictions[0][class_id]) # Only add detection if confidence is above threshold if confidence >= confidence_threshold: detections.append({ "label": class_names[class_id], "score": confidence, "box": {"xmin": x, "ymin": y, "xmax": x + w, "ymax": y + h} }) except Exception as e: logger.warning(f"Error classifying face: {e}") st.warning(f"Error classifying face: {str(e)}") return detections def draw_detections(image: np.ndarray, detections: List[Dict]) -> np.ndarray: """ Draw bounding boxes and labels on the image. Args: image: Input image as numpy array detections: List of detection dictionaries Returns: Annotated image as numpy array """ # Convert numpy array to PIL Image pil_image = Image.fromarray(image) draw = ImageDraw.Draw(pil_image) # Try to load a font, fall back to default if not available try: font = ImageFont.truetype("arial.ttf", 16) except: font = ImageFont.load_default() # Define colors for different classes colors = { "Mask": (0, 255, 0), # Green "No Mask": (255, 0, 0), # Red } for detection in detections: try: # Get bounding box coordinates box = detection["box"] xmin, ymin, xmax, ymax = box["xmin"], box["ymin"], box["xmax"], box["ymax"] # Get label and confidence label = detection["label"] confidence = detection["score"] # Get color based on label color = colors.get(label, (0, 0, 255)) # Default to blue if label not found # Draw bounding box draw.rectangle([(xmin, ymin), (xmax, ymax)], outline=color, width=3) # Create label text with confidence label_text = f"{label}: {confidence:.2%}" # Get text size text_bbox = draw.textbbox((0, 0), label_text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Draw filled rectangle for text background draw.rectangle( [(xmin, ymin - text_height - 5), (xmin + text_width + 10, ymin - 5)], fill=color ) # Draw text draw.text((xmin + 5, ymin - text_height - 5), label_text, fill="white", font=font) except Exception as e: logger.warning(f"Error drawing detection: {e}") st.warning(f"Error drawing detection: {str(e)}") # Convert back to numpy array return np.array(pil_image) class FaceMaskProcessor(VideoProcessorBase): """Video processor class for real-time face mask detection.""" def __init__(self, model: Any, target_size: Tuple[int, int] = (640, 480), confidence_threshold: float = 0.5, mirror: bool = False): self.model = model self.target_size = target_size self.confidence_threshold = confidence_threshold self.mirror = mirror self.frame_count = 0 self.processing_times = [] def recv(self, frame: av.VideoFrame) -> av.VideoFrame: """Process incoming video frame.""" start_time = time.time() # Convert frame to numpy array img = frame.to_ndarray(format="bgr24") # Mirror the image if requested if self.mirror: img = cv2.flip(img, 1) # Resize frame if needed if img.shape[:2][::-1] != self.target_size: img = cv2.resize(img, self.target_size) # Detect faces faces = detect_faces(img) # Classify each detected face detections = classify_faces(img, faces, self.confidence_threshold) # Draw detections on frame annotated_img = draw_detections(img, detections) # Calculate processing time processing_time = time.time() - start_time self.processing_times.append(processing_time) if len(self.processing_times) > 30: # Keep last 30 measurements self.processing_times.pop(0) self.frame_count += 1 # Convert back to VideoFrame return av.VideoFrame.from_ndarray(annotated_img, format="bgr24") def get_average_fps(self) -> float: """Calculate average FPS based on processing times.""" if not self.processing_times: return 0.0 avg_time = sum(self.processing_times) / len(self.processing_times) return 1.0 / avg_time if avg_time > 0 else 0.0 def main(): """Main function to run the Streamlit app.""" try: # Header st.markdown('

😷 Face Mask Detection

', unsafe_allow_html=True) st.markdown('

Real-time face mask detection using a Keras model. The system detects faces and classifies whether they are wearing a mask or not.

', unsafe_allow_html=True) # Load model and face detector model = load_model() load_face_detector() # Check if models loaded successfully if not model_loaded or not face_detector_loaded: st.error("Failed to load the model or face detector. Please check the files and try again.") # Additional debugging information st.markdown("---") st.markdown('', unsafe_allow_html=True) st.write("**Current Directory:**", os.getcwd()) st.write("**Files in Directory:**") for file in os.listdir(): if file.endswith(('.h5', '.keras', '.xml', '.prototxt', '.caffemodel')): st.write(f"- {file}") # Show system information st.write("**System Information:**") st.write(f"- Python Version: {os.sys.version}") st.write(f"- TensorFlow Version: {tf.__version__}") st.write(f"- OpenCV Version: {cv2.__version__}") # Check model file integrity model_path = "mask_detection_model.h5" if os.path.exists(model_path): st.write(f"\n**Model File Information:**") st.write(f"- File size: {os.path.getsize(model_path) / (1024*1024):.2f} MB") st.write(f"- File exists: Yes") # Try to read the file as HDF5 try: with h5py.File(model_path, 'r') as f: st.write(f"- HDF5 file: Valid") st.write(f"- Root keys: {list(f.keys())}") except Exception as e: st.write(f"- HDF5 file: Invalid - {str(e)}") # Check Haar cascade file cascade_file = "haarcascade_frontalface_default.xml" if os.path.exists(cascade_file): st.write(f"\n**Haar Cascade File Information:**") st.write(f"- File size: {os.path.getsize(cascade_file) / 1024:.2f} KB") st.write(f"- File exists: Yes") else: st.write(f"\n**Haar Cascade File Information:**") st.write(f"- File exists: No") # Check DNN model files dnn_files = ["deploy.prototxt", "res10_300x300_ssd_iter_140000.caffemodel"] st.write(f"\n**DNN Model Files Information:**") for file in dnn_files: if os.path.exists(file): st.write(f"- {file}: Exists ({os.path.getsize(file) / (1024*1024):.2f} MB)") else: st.write(f"- {file}: Not found") return # Sidebar with st.sidebar: st.markdown('', unsafe_allow_html=True) # Video size selection video_size = st.selectbox( "Video Size", options=["640x480", "1280x720", "1920x1080"], index=0, help="Select the resolution for the video stream" ) # FPS selection fps = st.slider( "Frames Per Second (FPS)", min_value=5, max_value=30, value=15, step=1, help="Adjust the frame rate for video processing" ) # Mirror video option mirror_video = st.checkbox( "Mirror Video", value=False, help="Flip the video horizontally" ) # Confidence threshold confidence_threshold = st.slider( "Confidence Threshold", min_value=0.1, max_value=0.9, value=0.5, step=0.05, help="Minimum confidence score for detections" ) # Face detection parameters st.markdown("---") st.markdown('', unsafe_allow_html=True) scale_factor = st.slider( "Scale Factor", min_value=1.01, max_value=1.5, value=1.1, step=0.01, help="Parameter specifying how much the image size is reduced at each image scale" ) min_neighbors = st.slider( "Min Neighbors", min_value=1, max_value=10, value=5, step=1, help="Parameter specifying how many neighbors each candidate rectangle should have to retain it" ) # Parse video size width, height = map(int, video_size.split('x')) # Main content area col1, col2 = st.columns([2, 1]) with col1: st.markdown('
', unsafe_allow_html=True) # WebRTC streamer webrtc_ctx = webrtc_streamer( key="face-mask-detection", mode=WebRtcMode.SENDRECV, video_processor_factory=lambda: FaceMaskProcessor( model, (width, height), confidence_threshold, mirror_video ), media_stream_constraints={ "video": { "width": {"ideal": width}, "height": {"ideal": height}, "frameRate": {"ideal": fps} }, "audio": False }, async_processing=True, ) st.markdown('
', unsafe_allow_html=True) # Instructions st.info(""" **Instructions:** 1. Click "START" to begin video streaming 2. Allow camera access when prompted 3. The system will detect faces and classify mask usage in real-time 4. Green boxes = With mask, Red boxes = Without mask """) with col2: st.markdown('', unsafe_allow_html=True) # Create legend cards st.markdown("""
Mask

Person is wearing a mask

No Mask

Person is not wearing a mask

""", unsafe_allow_html=True) # Show face detector status st.markdown("---") st.markdown('', unsafe_allow_html=True) if use_dnn_detector: st.success("Using DNN face detection (more accurate)") else: st.success("Using Haar cascade face detection (faster)") # System information at the bottom st.markdown("---") st.markdown("""

System Information

TensorFlow Version: {tf_version}

Model: {model_name} (Loaded from Hugging Face)

Face Detector: {detector_status}

Device: CPU

""".format( tf_version=tf.__version__, model_name="mask_detection_model.h5", detector_status="DNN" if use_dnn_detector else ("Haar Cascade" if face_detector_loaded else "Failed to load") ), unsafe_allow_html=True) # Footer st.markdown( '', unsafe_allow_html=True ) except Exception as e: st.error(f"An error occurred: {str(e)}") st.error(f"Error details: {traceback.format_exc()}") if __name__ == "__main__": main()