import time import os import logging import av import cv2 import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer from utils.download import download_file from utils.turn import get_ice_servers from mtcnn import MTCNN # Import MTCNN for face detection from PIL import Image, ImageDraw # Import PIL for image processing from transformers import pipeline # Import Hugging Face transformers pipeline import requests from io import BytesIO # Import for handling byte streams import yt_dlp # CHANGE CODE BELOW HERE, USE TO REPLACE WITH YOUR WANTED ANALYSIS. # Update below string to set display title of analysis # Appropriate imports needed for analysis # Initialize MTCNN for face detection mtcnn = MTCNN() # Initialize the Hugging Face pipeline for facial emotion detection emotion_pipeline = pipeline("image-classification", model="trpakov/vit-face-expression") # Default title - "Facial Sentiment Analysis" ANALYSIS_TITLE = "Facial Sentiment Analysis" # CHANGE THE CONTENTS OF THIS FUNCTION, USE TO REPLACE WITH YOUR WANTED ANALYSIS. # # # Function to analyze an input frame and generate an analyzed frame # This function takes an input video frame, detects faces in it using MTCNN, # then for each detected face, it analyzes the sentiment (emotion) using the analyze_sentiment function, # draws a rectangle around the face, and overlays the detected emotion on the frame. # It also records the time taken to process the frame and stores it in a global container. # Constants for text and line size in the output image TEXT_SIZE = 1 LINE_SIZE = 2 # Set analysis results in img_container and result queue for display # img_container["input"] - holds the input frame contents - of type np.ndarray # img_container["analyzed"] - holds the analyzed frame with any added annotations - of type np.ndarray # img_container["analysis_time"] - holds how long the analysis has taken in miliseconds # img_container["detections"] - holds the analysis metadata results def analyze_frame(frame: np.ndarray): start_time = time.time() # Start timing the analysis img_container["input"] = frame # Store the input frame frame = frame.copy() # Create a copy of the frame to modify results = mtcnn.detect_faces(frame) # Detect faces in the frame for result in results: x, y, w, h = result["box"] # Get the bounding box of the detected face face = frame[y: y + h, x: x + w] # Extract the face from the frame # Analyze the sentiment of the face sentiment = analyze_sentiment(face) result["label"] = sentiment # Draw a rectangle around the face cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), LINE_SIZE) text_size = cv2.getTextSize(sentiment, cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, 2)[ 0 ] text_x = x text_y = y - 10 background_tl = (text_x, text_y - text_size[1]) background_br = (text_x + text_size[0], text_y + 5) # Draw a black background for the text cv2.rectangle(frame, background_tl, background_br, (0, 0, 0), cv2.FILLED) # Put the sentiment text on the image cv2.putText( frame, sentiment, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, (255, 255, 255), 2, ) end_time = time.time() # End timing the analysis execution_time_ms = round( (end_time - start_time) * 1000, 2 ) # Calculate execution time in milliseconds # Store the execution time img_container["analysis_time"] = execution_time_ms # store the detections img_container["detections"] = results img_container["analyzed"] = frame # Store the analyzed frame return # End of the function # Function to analyze the sentiment (emotion) of a detected face # This function converts the face from BGR to RGB format, then converts it to a PIL image, # uses a pre-trained emotion detection model to get emotion predictions, # and finally returns the most dominant emotion detected. def analyze_sentiment(face): # Convert face to RGB format rgb_face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_face) # Convert to PIL image results = emotion_pipeline(pil_image) # Run emotion detection on the image dominant_emotion = max(results, key=lambda x: x["score"])[ "label" ] # Get the dominant emotion return dominant_emotion # Return the detected emotion # # # DO NOT TOUCH THE BELOW CODE (NOT NEEDED) # # # Suppress FFmpeg logs os.environ["FFMPEG_LOG_LEVEL"] = "quiet" # Suppress Streamlit logs using the logging module logging.getLogger("streamlit").setLevel(logging.ERROR) # Container to hold image data and analysis results img_container = {"input": None, "analyzed": None, "analysis_time": None, "detections": None} # Logger for debugging and information logger = logging.getLogger(__name__) # Callback function to process video frames # This function is called for each video frame in the WebRTC stream. # It converts the frame to a numpy array in RGB format, analyzes the frame, # and returns the original frame. def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: # Convert frame to numpy array in RGB format img = frame.to_ndarray(format="rgb24") analyze_frame(img) # Analyze the frame return frame # Return the original frame # Get ICE servers for WebRTC ice_servers = get_ice_servers() # Streamlit UI configuration st.set_page_config(layout="wide") # Custom CSS for the Streamlit page st.markdown( """ """, unsafe_allow_html=True, ) # Streamlit page title and subtitle st.title("Computer Vision Playground") # Add a link to the README file st.markdown( """

See the README to learn how to use this code to help you start your computer vision exploration.

""", unsafe_allow_html=True, ) st.subheader(ANALYSIS_TITLE) # Columns for input and output streams col1, col2 = st.columns(2) with col1: st.header("Input Stream") input_subheader = st.empty() input_placeholder = st.empty() # Placeholder for input frame st.subheader("Input Options") # WebRTC streamer to get video input from the webcam webrtc_ctx = webrtc_streamer( key="input-webcam", mode=WebRtcMode.SENDONLY, rtc_configuration=ice_servers, video_frame_callback=video_frame_callback, media_stream_constraints={"video": True, "audio": False}, async_processing=True, ) # File uploader for images st.subheader("Upload an Image") uploaded_file = st.file_uploader( "Choose an image...", type=["jpg", "jpeg", "png"]) # Text input for image URL st.subheader("Or Enter Image URL") image_url = st.text_input("Image URL") # Text input for YouTube URL st.subheader("Enter a YouTube URL") youtube_url = st.text_input("YouTube URL") # File uploader for videos st.subheader("Upload a Video") uploaded_video = st.file_uploader( "Choose a video...", type=["mp4", "avi", "mov", "mkv"] ) # Text input for video URL st.subheader("Or Enter Video Download URL") video_url = st.text_input("Video URL") # Streamlit footer st.markdown( """

If you want to set up your own computer vision playground see here.

""", unsafe_allow_html=True ) # Function to initialize the analysis UI # This function sets up the placeholders and UI elements in the analysis section. # It creates placeholders for input and output frames, analysis time, and detected labels. def analysis_init(): global analysis_time, show_labels, labels_placeholder, input_subheader, input_placeholder, output_placeholder with col2: st.header("Analysis") input_subheader.subheader("Input Frame") st.subheader("Output Frame") output_placeholder = st.empty() # Placeholder for output frame analysis_time = st.empty() # Placeholder for analysis time show_labels = st.checkbox( "Show the detected labels", value=True ) # Checkbox to show/hide labels labels_placeholder = st.empty() # Placeholder for labels # Function to publish frames and results to the Streamlit UI # This function retrieves the latest frames and results from the global container and result queue, # and updates the placeholders in the Streamlit UI with the current input frame, analyzed frame, analysis time, and detected labels. def publish_frame(): img = img_container["input"] if img is None: return input_placeholder.image(img, channels="RGB") # Display the input frame analyzed = img_container["analyzed"] if analyzed is None: return # Display the analyzed frame output_placeholder.image(analyzed, channels="RGB") time = img_container["analysis_time"] if time is None: return # Display the analysis time analysis_time.text(f"Analysis Time: {time} ms") detections = img_container["detections"] if detections is None: return if show_labels: labels_placeholder.table( detections ) # Display labels if the checkbox is checked # If the WebRTC streamer is playing, initialize and publish frames if webrtc_ctx.state.playing: analysis_init() # Initialize the analysis UI while True: publish_frame() # Publish the frames and results time.sleep(0.1) # Delay to control frame rate # If an image is uploaded or a URL is provided, process the image if uploaded_file is not None or image_url: analysis_init() # Initialize the analysis UI if uploaded_file is not None: image = Image.open(uploaded_file) # Open the uploaded image img = np.array(image.convert("RGB")) # Convert the image to RGB format else: response = requests.get(image_url) # Download the image from the URL # Open the downloaded image image = Image.open(BytesIO(response.content)) img = np.array(image.convert("RGB")) # Convert the image to RGB format analyze_frame(img) # Analyze the image publish_frame() # Publish the results # Function to process video files # This function reads frames from a video file, analyzes each frame for face detection and sentiment analysis, # and updates the Streamlit UI with the current input frame, analyzed frame, and detected labels. def process_video(video_path): cap = cv2.VideoCapture(video_path) # Open the video file while cap.isOpened(): ret, frame = cap.read() # Read a frame from the video if not ret: break # Exit the loop if no more frames are available # Convert the frame from BGR to RGB format rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Analyze the frame for face detection and sentiment analysis analyze_frame(rgb_frame) publish_frame() # Publish the results cap.release() # Release the video capture object # Function to get the video stream URL from YouTube using yt-dlp def get_youtube_stream_url(youtube_url): ydl_opts = { 'format': 'best[ext=mp4]', 'quiet': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(youtube_url, download=False) stream_url = info_dict['url'] return stream_url # If a YouTube URL is provided, process the video if youtube_url: analysis_init() # Initialize the analysis UI stream_url = get_youtube_stream_url(youtube_url) process_video(stream_url) # Process the video # If a video is uploaded or a URL is provided, process the video if uploaded_video is not None or video_url: analysis_init() # Initialize the analysis UI if uploaded_video is not None: video_path = uploaded_video.name # Get the name of the uploaded video with open(video_path, "wb") as f: # Save the uploaded video to a file f.write(uploaded_video.getbuffer()) else: # Download the video from the URL video_path = download_file(video_url) process_video(video_path) # Process the video