|
import time |
|
import os |
|
import logging |
|
|
|
|
|
import av |
|
import cv2 |
|
import numpy as np |
|
import streamlit as st |
|
from streamlit_webrtc import WebRtcMode, webrtc_streamer |
|
|
|
from utils.download import download_file |
|
from utils.turn import get_ice_servers |
|
|
|
from mtcnn import MTCNN |
|
from PIL import Image, ImageDraw |
|
from transformers import pipeline |
|
|
|
import requests |
|
from io import BytesIO |
|
import yt_dlp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mtcnn = MTCNN() |
|
|
|
|
|
emotion_pipeline = pipeline("image-classification", |
|
model="trpakov/vit-face-expression") |
|
|
|
|
|
|
|
|
|
ANALYSIS_TITLE = "Facial Sentiment Analysis" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEXT_SIZE = 1 |
|
LINE_SIZE = 2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_frame(frame: np.ndarray): |
|
start_time = time.time() |
|
img_container["input"] = frame |
|
frame = frame.copy() |
|
|
|
results = mtcnn.detect_faces(frame) |
|
for result in results: |
|
x, y, w, h = result["box"] |
|
face = frame[y: y + h, x: x + w] |
|
|
|
sentiment = analyze_sentiment(face) |
|
result["label"] = sentiment |
|
|
|
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), LINE_SIZE) |
|
text_size = cv2.getTextSize(sentiment, cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, 2)[ |
|
0 |
|
] |
|
text_x = x |
|
text_y = y - 10 |
|
background_tl = (text_x, text_y - text_size[1]) |
|
background_br = (text_x + text_size[0], text_y + 5) |
|
|
|
cv2.rectangle(frame, background_tl, background_br, |
|
(0, 0, 0), cv2.FILLED) |
|
|
|
cv2.putText( |
|
frame, |
|
sentiment, |
|
(text_x, text_y), |
|
cv2.FONT_HERSHEY_SIMPLEX, |
|
TEXT_SIZE, |
|
(255, 255, 255), |
|
2, |
|
) |
|
|
|
end_time = time.time() |
|
execution_time_ms = round( |
|
(end_time - start_time) * 1000, 2 |
|
) |
|
|
|
img_container["analysis_time"] = execution_time_ms |
|
|
|
|
|
img_container["detections"] = results |
|
|
|
img_container["analyzed"] = frame |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_sentiment(face): |
|
|
|
rgb_face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) |
|
pil_image = Image.fromarray(rgb_face) |
|
results = emotion_pipeline(pil_image) |
|
dominant_emotion = max(results, key=lambda x: x["score"])[ |
|
"label" |
|
] |
|
return dominant_emotion |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ["FFMPEG_LOG_LEVEL"] = "quiet" |
|
|
|
|
|
logging.getLogger("streamlit").setLevel(logging.ERROR) |
|
|
|
|
|
img_container = {"input": None, "analyzed": None, |
|
"analysis_time": None, "detections": None} |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: |
|
|
|
img = frame.to_ndarray(format="rgb24") |
|
analyze_frame(img) |
|
return frame |
|
|
|
|
|
|
|
ice_servers = get_ice_servers() |
|
|
|
|
|
st.set_page_config(layout="wide") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<style> |
|
.main { |
|
padding: 2rem; |
|
} |
|
h1, h2, h3 { |
|
font-family: 'Arial', sans-serif; |
|
} |
|
h1 { |
|
font-weight: 700; |
|
font-size: 2.5rem; |
|
} |
|
h2 { |
|
font-weight: 600; |
|
font-size: 2rem; |
|
} |
|
h3 { |
|
font-weight: 500; |
|
font-size: 1.5rem; |
|
} |
|
</style> |
|
""", |
|
unsafe_allow_html=True, |
|
) |
|
|
|
|
|
st.title("Computer Vision Playground") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<div style="text-align: left;"> |
|
<p>See the <a href="https://huggingface.co/spaces/eusholli/sentiment-analyzer/blob/main/README.md" |
|
target="_blank">README</a> to learn how to use this code to help you start your computer vision exploration.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True, |
|
) |
|
|
|
st.subheader(ANALYSIS_TITLE) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.header("Input Stream") |
|
input_subheader = st.empty() |
|
input_placeholder = st.empty() |
|
st.subheader("Input Options") |
|
|
|
webrtc_ctx = webrtc_streamer( |
|
key="input-webcam", |
|
mode=WebRtcMode.SENDONLY, |
|
rtc_configuration=ice_servers, |
|
video_frame_callback=video_frame_callback, |
|
media_stream_constraints={"video": True, "audio": False}, |
|
async_processing=True, |
|
) |
|
|
|
|
|
st.subheader("Upload an Image") |
|
uploaded_file = st.file_uploader( |
|
"Choose an image...", type=["jpg", "jpeg", "png"]) |
|
|
|
|
|
st.subheader("Or Enter Image URL") |
|
image_url = st.text_input("Image URL") |
|
|
|
|
|
st.subheader("Enter a YouTube URL") |
|
youtube_url = st.text_input("YouTube URL") |
|
|
|
|
|
st.subheader("Upload a Video") |
|
uploaded_video = st.file_uploader( |
|
"Choose a video...", type=["mp4", "avi", "mov", "mkv"] |
|
) |
|
|
|
|
|
st.subheader("Or Enter Video Download URL") |
|
video_url = st.text_input("Video URL") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<div style="text-align: center; margin-top: 2rem;"> |
|
<p>If you want to set up your own computer vision playground see <a href="https://huggingface.co/spaces/eusholli/computer-vision-playground/blob/main/README.md" target="_blank">here</a>.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def analysis_init(): |
|
global analysis_time, show_labels, labels_placeholder, input_subheader, input_placeholder, output_placeholder |
|
|
|
with col2: |
|
st.header("Analysis") |
|
input_subheader.subheader("Input Frame") |
|
|
|
st.subheader("Output Frame") |
|
output_placeholder = st.empty() |
|
analysis_time = st.empty() |
|
show_labels = st.checkbox( |
|
"Show the detected labels", value=True |
|
) |
|
labels_placeholder = st.empty() |
|
|
|
|
|
|
|
|
|
|
|
def publish_frame(): |
|
|
|
img = img_container["input"] |
|
if img is None: |
|
return |
|
input_placeholder.image(img, channels="RGB") |
|
|
|
analyzed = img_container["analyzed"] |
|
if analyzed is None: |
|
return |
|
|
|
output_placeholder.image(analyzed, channels="RGB") |
|
|
|
time = img_container["analysis_time"] |
|
if time is None: |
|
return |
|
|
|
analysis_time.text(f"Analysis Time: {time} ms") |
|
|
|
detections = img_container["detections"] |
|
if detections is None: |
|
return |
|
|
|
if show_labels: |
|
labels_placeholder.table( |
|
detections |
|
) |
|
|
|
|
|
|
|
if webrtc_ctx.state.playing: |
|
analysis_init() |
|
while True: |
|
publish_frame() |
|
time.sleep(0.1) |
|
|
|
|
|
|
|
if uploaded_file is not None or image_url: |
|
analysis_init() |
|
|
|
if uploaded_file is not None: |
|
image = Image.open(uploaded_file) |
|
img = np.array(image.convert("RGB")) |
|
else: |
|
response = requests.get(image_url) |
|
|
|
image = Image.open(BytesIO(response.content)) |
|
img = np.array(image.convert("RGB")) |
|
|
|
analyze_frame(img) |
|
publish_frame() |
|
|
|
|
|
|
|
|
|
|
|
def process_video(video_path): |
|
cap = cv2.VideoCapture(video_path) |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
analyze_frame(rgb_frame) |
|
|
|
publish_frame() |
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
|
def get_youtube_stream_url(youtube_url): |
|
ydl_opts = { |
|
'format': 'best[ext=mp4]', |
|
'quiet': True, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(youtube_url, download=False) |
|
stream_url = info_dict['url'] |
|
return stream_url |
|
|
|
|
|
|
|
if youtube_url: |
|
analysis_init() |
|
|
|
stream_url = get_youtube_stream_url(youtube_url) |
|
|
|
process_video(stream_url) |
|
|
|
|
|
|
|
if uploaded_video is not None or video_url: |
|
analysis_init() |
|
|
|
if uploaded_video is not None: |
|
video_path = uploaded_video.name |
|
with open(video_path, "wb") as f: |
|
|
|
f.write(uploaded_video.getbuffer()) |
|
else: |
|
|
|
video_path = download_file(video_url) |
|
|
|
process_video(video_path) |
|
|