import streamlit as st import time from typing import List from streamlit_webrtc import webrtc_streamer, WebRtcMode import logging import mediapipe as mp import tflite_runtime.interpreter as tflite import av import numpy as np import queue from streamlit_toggle import st_toggle_switch import pandas as pd from tools.nametypes import Stats, Detection from pathlib import Path from tools.utils import get_ice_servers, download_file, display_match, rgb, format_dflist from tools.face_recognition import ( detect_faces, align_faces, inference, draw_detections, recognize_faces, process_gallery, ) # Set logging level to error (To avoid getting spammed by queue warnings etc.) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.ERROR) ROOT = Path(__file__).parent MODEL_URL = ( "https://github.com/Martlgap/FaceIDLight/releases/download/v.0.1/mobileNet.tflite" ) MODEL_LOCAL_PATH = ROOT / "./models/mobileNet.tflite" DETECTION_CONFIDENCE = 0.5 TRACKING_CONFIDENCE = 0.5 MAX_FACES = 2 # Set page layout for streamlit to wide st.set_page_config( layout="wide", page_title="FaceID App Demo", page_icon=":sunglasses:" ) with st.sidebar: st.markdown("# Preferences") face_rec_on = st_toggle_switch( "Face Recognition", key="activate_face_rec", default_value=True, active_color=rgb(255, 75, 75), track_color=rgb(50, 50, 50), ) st.markdown("## Webcam & Stream") resolution = st.selectbox( "Webcam Resolution", [(1920, 1080), (1280, 720), (640, 360)], index=2, ) st.markdown("Note: To change the resolution, you have to restart the stream.") ice_server = st.selectbox("ICE Server", ["twilio", "metered"], index=0) st.markdown( "Note: metered is a free server with limited bandwidth, and can take a while to connect. Twilio is a paid service and is payed by me, so please don't abuse it." ) st.markdown("## Face Detection") max_faces = st.number_input("Maximum Number of Faces", value=2, min_value=1) detection_confidence = st.slider( "Min Detection Confidence", min_value=0.0, max_value=1.0, value=0.5 ) tracking_confidence = st.slider( "Min Tracking Confidence", min_value=0.0, max_value=1.0, value=0.9 ) st.markdown("## Face Recognition") similarity_threshold = st.slider( "Similarity Threshold", min_value=0.0, max_value=2.0, value=0.67 ) st.markdown( "This sets a maximum distance for the cosine similarity between the embeddings of the detected face and the gallery images. If the distance is below the threshold, the face is recognized as the gallery image with the lowest distance. If the distance is above the threshold, the face is not recognized." ) download_file( MODEL_URL, MODEL_LOCAL_PATH, file_hash="6c19b789f661caa8da735566490bfd8895beffb2a1ec97a56b126f0539991aa6", ) # Session-specific caching of the face recognition model cache_key = "face_id_model" if cache_key in st.session_state: face_recognition_model = st.session_state[cache_key] else: face_recognition_model = tflite.Interpreter(model_path=MODEL_LOCAL_PATH.as_posix()) st.session_state[cache_key] = face_recognition_model # Session-specific caching of the face recognition model cache_key = "face_id_model_gal" if cache_key in st.session_state: face_recognition_model_gal = st.session_state[cache_key] else: face_recognition_model_gal = tflite.Interpreter( model_path=MODEL_LOCAL_PATH.as_posix() ) st.session_state[cache_key] = face_recognition_model_gal # Session-specific caching of the face detection model cache_key = "face_detection_model" if cache_key in st.session_state: face_detection_model = st.session_state[cache_key] else: face_detection_model = mp.solutions.face_mesh.FaceMesh( refine_landmarks=True, min_detection_confidence=detection_confidence, min_tracking_confidence=tracking_confidence, max_num_faces=max_faces, ) st.session_state[cache_key] = face_detection_model # Session-specific caching of the face detection model cache_key = "face_detection_model_gal" if cache_key in st.session_state: face_detection_model_gal = st.session_state[cache_key] else: face_detection_model_gal = mp.solutions.face_mesh.FaceMesh( refine_landmarks=True, min_detection_confidence=detection_confidence, min_tracking_confidence=tracking_confidence, max_num_faces=max_faces, ) st.session_state[cache_key] = face_detection_model_gal stats_queue: "queue.Queue[Stats]" = queue.Queue() detections_queue: "queue.Queue[List[Detection]]" = queue.Queue() def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: # Initialize detections detections = [] # Initialize stats stats = Stats() # Start timer for FPS calculation frame_start = time.time() # Convert frame to numpy array frame = frame.to_ndarray(format="rgb24") # Get frame resolution and add to stats resolution = frame.shape stats = stats._replace(resolution=resolution) if face_rec_on: # Run face detection start = time.time() detections = detect_faces(frame, face_detection_model) stats = stats._replace(num_faces=len(detections) if detections else 0) stats = stats._replace(detection=(time.time() - start) * 1000) # Run face alignment start = time.time() detections = align_faces(frame, detections) stats = stats._replace(alignment=(time.time() - start) * 1000) # Run inference start = time.time() detections = inference(detections, face_recognition_model) stats = stats._replace(inference=(time.time() - start) * 1000) # Run face recognition start = time.time() detections = recognize_faces(detections, gallery, similarity_threshold) stats = stats._replace(recognition=(time.time() - start) * 1000) # Draw detections start = time.time() frame = draw_detections(frame, detections) stats = stats._replace(drawing=(time.time() - start) * 1000) # Convert frame back to av.VideoFrame frame = av.VideoFrame.from_ndarray(frame, format="rgb24") # Calculate FPS and add to stats stats = stats._replace(fps=1 / (time.time() - frame_start)) # Send data to other thread detections_queue.put_nowait(detections) stats_queue.put_nowait(stats) return frame # Streamlit app st.title("FaceID App Demonstration") st.sidebar.markdown("**Gallery**") gallery = st.sidebar.file_uploader( "Upload images to gallery", type=["png", "jpg", "jpeg"], accept_multiple_files=True ) if gallery: gallery = process_gallery(gallery, face_detection_model_gal, face_recognition_model_gal) st.sidebar.markdown("**Gallery Images**") st.sidebar.image( [identity.image for identity in gallery], caption=[identity.name for identity in gallery], width=112, ) st.markdown("**Stats**") stats = st.empty() ctx = webrtc_streamer( key="FaceIDAppDemo", mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers(name=ice_server)}, video_frame_callback=video_frame_callback, media_stream_constraints={ "video": { "width": { "min": resolution[0], "ideal": resolution[0], "max": resolution[0], }, "height": { "min": resolution[1], "ideal": resolution[1], "max": resolution[1], }, }, "audio": False, }, async_processing=True, ) st.markdown("**Identified Faces**") identified_faces = st.empty() st.markdown("**Detections**") detections = st.empty() # Display Live Stats if ctx.state.playing: while True: # Get stats stats_data = stats_queue.get() stats_dataframe = pd.DataFrame([stats_data]) stats_dataframe.style.format(thousands=" ", precision=2) # Write stats to streamlit stats.dataframe(stats_dataframe) # Get detections detections_data = detections_queue.get() detections_dataframe = ( pd.DataFrame(detections_data) .drop(columns=["face", "face_match"], errors="ignore") .applymap(lambda x: (format_dflist(x))) ) # Write detections to streamlit detections.dataframe(detections_dataframe) # Write identified faces to streamlit identified_faces.image( [display_match(d) for d in detections_data if d.name is not None], caption=[ d.name + f"({d.distance:2f})" for d in detections_data if d.name is not None ], width=112, )