import os import cv2 import streamlit as st import numpy as np from PIL import Image, ImageDraw, ImageFont import tensorflow as tf from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, WebRtcMode import av import time from typing import Tuple, List, Dict, Any import h5py from huggingface_hub import hf_hub_download import urllib.request import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Set environment variables for headless operation os.environ["DISPLAY"] = ":0" os.environ["QT_QPA_PLATFORM"] = "offscreen" os.environ["OPENCV_HEADLESS"] = "1" # Set TensorFlow to use CPU only try: tf.config.set_visible_devices([], 'GPU') logger.info("TensorFlow configured to use CPU only") except Exception as e: logger.error(f"Error configuring TensorFlow for CPU: {e}") # Set page config with transparent background st.set_page_config( page_title="Face Mask Detection", page_icon="😷", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS for modern styling with transparent background st.markdown(""" """, unsafe_allow_html=True) # Global variables for model and processor model = None face_cascade = None dnn_net = None model_input_size = (128, 128) # From model config class_names = ['Mask', 'No Mask'] # From model config model_loaded = False face_detector_loaded = False use_dnn_detector = False def download_haarcascade(): """Download the Haar cascade file if it doesn't exist.""" cascade_file = "haarcascade_frontalface_default.xml" cascade_url = "https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml" if not os.path.exists(cascade_file): try: with st.spinner("Downloading Haar cascade file..."): urllib.request.urlretrieve(cascade_url, cascade_file) st.success("Haar cascade file downloaded successfully!") return True except Exception as e: st.error(f"Failed to download Haar cascade file: {str(e)}") return False return True def download_dnn_model(): """Download the DNN face detection model files.""" model_files = { "deploy.prototxt": "https://raw.githubusercontent.com/opencv/opencv/master/samples/dnn/face_detector/deploy.prototxt", "res10_300x300_ssd_iter_140000.caffemodel": "https://raw.githubusercontent.com/opencv/opencv_3rdparty/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel" } all_downloaded = True for filename, url in model_files.items(): if not os.path.exists(filename): try: with st.spinner(f"Downloading {filename}..."): urllib.request.urlretrieve(url, filename) st.success(f"{filename} downloaded successfully!") except Exception as e: st.error(f"Failed to download {filename}: {str(e)}") all_downloaded = False return all_downloaded def load_model() -> Any: """Load the Keras face mask detection model from Hugging Face with enhanced error handling.""" global model, model_loaded if model is None: model_filename = "mask_detection_model.h5" repo_id = "sreenathsree1578/face_mask_detection" try: # Download model from Hugging Face Hub with st.spinner("Downloading model from Hugging Face Hub..."): model_path = hf_hub_download(repo_id=repo_id, filename=model_filename) # Try loading with different methods # Method 1: Standard Keras load try: logger.info("Attempting to load model with standard method") model = tf.keras.models.load_model(model_path) model_loaded = True logger.info("Model loaded successfully with standard method") return model except Exception as e1: logger.error(f"Standard model loading failed: {e1}") # Method 2: Try with custom objects try: logger.info("Attempting to load model with compile=False") model = tf.keras.models.load_model(model_path, compile=False) model_loaded = True logger.info("Model loaded successfully with compile=False") return model except Exception as e2: logger.error(f"Model loading with compile=False failed: {e2}") # Method 3: Try loading as SavedModel if it's actually a directory try: logger.info("Attempting to load model as SavedModel") if os.path.isdir(model_path): model = tf.keras.models.load_model(model_path) model_loaded = True logger.info("Model loaded successfully as SavedModel") return model except Exception as e3: logger.error(f"SavedModel loading failed: {e3}") # If all methods failed model_loaded = False return None except Exception as e: logger.error(f"Error loading model from Hugging Face: {e}") st.error(f"Error loading model from Hugging Face: {str(e)}") model_loaded = False return None return model def load_face_detector(): """Load OpenCV's Haar cascade or DNN face detector.""" global face_cascade, dnn_net, face_detector_loaded, use_dnn_detector # First try to load Haar cascade if not use_dnn_detector: # Download the Haar cascade file if needed if not download_haarcascade(): logger.info("Haar cascade download failed, trying DNN detector") use_dnn_detector = True return load_face_detector() try: # Load the pre-trained Haar cascade classifier from local file face_cascade = cv2.CascadeClassifier("haarcascade_frontalface_default.xml") # Check if the cascade was loaded successfully if face_cascade.empty(): logger.info("Haar cascade is empty, trying DNN detector") use_dnn_detector = True return load_face_detector() face_detector_loaded = True use_dnn_detector = False return True except Exception as e: logger.error(f"Error loading Haar cascade: {e}") use_dnn_detector = True return load_face_detector() # If we're here, we need to use the DNN detector if not download_dnn_model(): face_detector_loaded = False return False try: # Load the DNN model dnn_net = cv2.dnn.readNetFromCaffe("deploy.prototxt", "res10_300x300_ssd_iter_140000.caffemodel") face_detector_loaded = True use_dnn_detector = True return True except Exception as e: logger.error(f"Error loading DNN model: {e}") face_detector_loaded = False return False def detect_faces_haar(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces using Haar cascade.""" # Convert to grayscale for face detection gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces faces = face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE ) # Convert to list of tuples (x, y, w, h) return [(x, y, w, h) for (x, y, w, h) in faces] def detect_faces_dnn(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces using DNN model.""" # Get image dimensions (h, w) = image.shape[:2] # Create a blob from the image blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) # Pass the blob through the network and get the detections dnn_net.setInput(blob) detections = dnn_net.forward() faces = [] # Loop over the detections for i in range(0, detections.shape[2]): # Extract the confidence (i.e., probability) associated with the prediction confidence = detections[0, 0, i, 2] # Filter out weak detections by ensuring the confidence is greater than a minimum threshold if confidence > 0.5: # Compute the (x, y)-coordinates of the bounding box for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # Add to faces list faces.append((startX, startY, endX - startX, endY - startY)) return faces def detect_faces(image: np.ndarray) -> List[Tuple[int, int, int, int]]: """Detect faces in the image using Haar cascade or DNN method.""" if use_dnn_detector: return detect_faces_dnn(image) else: return detect_faces_haar(image) def preprocess_image(image: np.ndarray) -> np.ndarray: """Preprocess image for model inference.""" # Resize to model input size resized = cv2.resize(image, model_input_size) # Normalize to [0,1] normalized = resized.astype(np.float32) / 255.0 # Add batch dimension return np.expand_dims(normalized, axis=0) def classify_faces(image: np.ndarray, faces: List[Tuple[int, int, int, int]], confidence_threshold: float = 0.5) -> List[Dict]: """Classify each detected face as mask or no mask.""" detections = [] if not model_loaded: # If model is not loaded, return empty detections return detections for (x, y, w, h) in faces: # Extract face ROI face_roi = image[y:y+h, x:x+w] # Skip if face ROI is empty if face_roi.size == 0: continue # Preprocess the face ROI processed_face = preprocess_image(face_roi) # Classify the face try: predictions = model.predict(processed_face, verbose=0) # Get the class with the highest probability class_id = np.argmax(predictions[0]) confidence = float(predictions[0][class_id]) # Only add detection if confidence is above threshold if confidence >= confidence_threshold: detections.append({ "label": class_names[class_id], "score": confidence, "box": {"xmin": x, "ymin": y, "xmax": x + w, "ymax": y + h} }) except Exception as e: logger.warning(f"Error classifying face: {e}") st.warning(f"Error classifying face: {str(e)}") return detections def draw_detections(image: np.ndarray, detections: List[Dict]) -> np.ndarray: """ Draw bounding boxes and labels on the image. Args: image: Input image as numpy array detections: List of detection dictionaries Returns: Annotated image as numpy array """ # Convert numpy array to PIL Image pil_image = Image.fromarray(image) draw = ImageDraw.Draw(pil_image) # Try to load a font, fall back to default if not available try: font = ImageFont.truetype("arial.ttf", 16) except: font = ImageFont.load_default() # Define colors for different classes colors = { "Mask": (0, 255, 0), # Green "No Mask": (255, 0, 0), # Red } for detection in detections: try: # Get bounding box coordinates box = detection["box"] xmin, ymin, xmax, ymax = box["xmin"], box["ymin"], box["xmax"], box["ymax"] # Get label and confidence label = detection["label"] confidence = detection["score"] # Get color based on label color = colors.get(label, (0, 0, 255)) # Default to blue if label not found # Draw bounding box draw.rectangle([(xmin, ymin), (xmax, ymax)], outline=color, width=3) # Create label text with confidence label_text = f"{label}: {confidence:.2%}" # Get text size text_bbox = draw.textbbox((0, 0), label_text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Draw filled rectangle for text background draw.rectangle( [(xmin, ymin - text_height - 5), (xmin + text_width + 10, ymin - 5)], fill=color ) # Draw text draw.text((xmin + 5, ymin - text_height - 5), label_text, fill="white", font=font) except Exception as e: logger.warning(f"Error drawing detection: {e}") st.warning(f"Error drawing detection: {str(e)}") # Convert back to numpy array return np.array(pil_image) class FaceMaskProcessor(VideoProcessorBase): """Video processor class for real-time face mask detection.""" def __init__(self, model: Any, target_size: Tuple[int, int] = (640, 480), confidence_threshold: float = 0.5, mirror: bool = False): self.model = model self.target_size = target_size self.confidence_threshold = confidence_threshold self.mirror = mirror self.frame_count = 0 self.processing_times = [] def recv(self, frame: av.VideoFrame) -> av.VideoFrame: """Process incoming video frame.""" start_time = time.time() # Convert frame to numpy array img = frame.to_ndarray(format="bgr24") # Mirror the image if requested if self.mirror: img = cv2.flip(img, 1) # Resize frame if needed if img.shape[:2][::-1] != self.target_size: img = cv2.resize(img, self.target_size) # Detect faces faces = detect_faces(img) # Classify each detected face detections = classify_faces(img, faces, self.confidence_threshold) # Draw detections on frame annotated_img = draw_detections(img, detections) # Calculate processing time processing_time = time.time() - start_time self.processing_times.append(processing_time) if len(self.processing_times) > 30: # Keep last 30 measurements self.processing_times.pop(0) self.frame_count += 1 # Convert back to VideoFrame return av.VideoFrame.from_ndarray(annotated_img, format="bgr24") def get_average_fps(self) -> float: """Calculate average FPS based on processing times.""" if not self.processing_times: return 0.0 avg_time = sum(self.processing_times) / len(self.processing_times) return 1.0 / avg_time if avg_time > 0 else 0.0 def main(): """Main function to run the Streamlit app.""" try: # Header st.markdown('
Real-time face mask detection using a Keras model. The system detects faces and classifies whether they are wearing a mask or not.
', unsafe_allow_html=True) # Load model and face detector model = load_model() load_face_detector() # Check if models loaded successfully if not model_loaded or not face_detector_loaded: st.error("Failed to load the model or face detector. Please check the files and try again.") # Additional debugging information st.markdown("---") st.markdown('Person is wearing a mask
Person is not wearing a mask
TensorFlow Version: {tf_version}
Model: {model_name} (Loaded from Hugging Face)
Face Detector: {detector_status}
Device: CPU