sudo-paras-shah's picture
Try UDP only ICE servers
37bed39
import os
import sys
import asyncio
import tempfile
import traceback
os.environ["HOME"] = "/tmp"
os.environ["STREAMLIT_HOME"] = "/tmp"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"
if sys.platform.startswith('linux'):
try:
asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
import cv2
import numpy as np
from PIL import Image
import streamlit as st
from streamlit_webrtc import VideoProcessorBase, webrtc_streamer, RTCConfiguration
from huggingface_hub import hf_hub_download
from twilio.rest import Client
account_sid = os.environ.get("ACCOUNT_SID")
auth_token = os.environ.get("AUTH_TOKEN")
ICE_SERVERS = [{"urls": ["stun:stun.l.google.com:19302"]}]
if account_sid and auth_token:
try:
twilio_client = Client(account_sid, auth_token)
token = twilio_client.tokens.create()
try:
ICE_SERVERS = [
server for server in token.ice_servers
if any("udp" in url for url in ([server["urls"]] if isinstance(server["urls"], str) else server["urls"]))
]
st.success("✅ Using Twilio TURN/STUN servers with UDP")
except Exception as e:
ICE_SERVERS = token.ice_servers
st.success("✅ Using Twilio TURN/STUN servers")
except Exception as e:
st.error(f"❌ Failed to get ICE servers from Twilio: {e}")
st.text(traceback.format_exc())
else:
st.warning("⚠️ Twilio credentials not set. Falling back to STUN-only.")
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
except Exception as e:
print(e)
# --- Utility functions (from utils/utils.py) ---
# You must ensure these are implemented or import them if available.
from nets import get_model_from_name
from utils.utils import (cvtColor, get_classes, letterbox_image, preprocess_input)
# --- Classification class (merged from classification.py) ---
cache_dir = os.path.join(tempfile.gettempdir(), "hf_cache")
os.makedirs(cache_dir, exist_ok=True)
class Classification(object):
_defaults = {
"model_path": hf_hub_download(
repo_id="sudo-paras-shah/micro-expression-casme2",
filename="ep089.weights.h5",
cache_dir=cache_dir
),
"classes_path": 'src/model_data/cls_classes.txt',
"input_shape": [224, 224],
"backbone": 'vgg16',
"alpha": 0.25
}
@classmethod
def get_defaults(cls, n):
if n in cls._defaults:
return cls._defaults[n]
else:
return "Unrecognized attribute name '" + n + "'"
def __init__(self, **kwargs):
self.__dict__.update(self._defaults)
for name, value in kwargs.items():
setattr(self, name, value)
self.class_names, self.num_classes = get_classes(self.classes_path)
self.generate()
def generate(self):
model_path = os.path.expanduser(self.model_path)
assert model_path.endswith('.h5'), 'Keras model or weights must be a .h5 file.'
if self.backbone == "mobilenet":
self.model = get_model_from_name[self.backbone](
input_shape=[self.input_shape[0], self.input_shape[1], 3],
classes=self.num_classes,
alpha=self.alpha
)
else:
self.model = get_model_from_name[self.backbone](
input_shape=[self.input_shape[0], self.input_shape[1], 3],
classes=self.num_classes
)
self.model.load_weights(self.model_path)
print('{} model, and classes {} loaded.'.format(model_path, self.class_names))
def detect_image(self, image):
image = cvtColor(image)
image_data = letterbox_image(image, [self.input_shape[1], self.input_shape[0]])
image_data = np.expand_dims(preprocess_input(np.array(image_data, np.float32)), 0)
preds = self.model.predict(image_data)[0]
class_name = self.class_names[np.argmax(preds)]
probability = np.max(preds)
return class_name, probability
# --- Main Streamlit App ---
if __name__ == '__main__':
@st.cache_resource
def get_model():
return Classification()
classificator = get_model()
face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_alt.xml'
)
if face_cascade.empty():
st.error("Failed to load Haarcascade XML. Check the path.")
st.title("Real-Time Micro-Emotion Recognition")
st.write("Turn on your camera and detect emotions in real-time.")
def face_detect(img):
try:
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(
img_gray,
scaleFactor=1.1,
minNeighbors=1,
minSize=(30, 30)
)
return img, img_gray, faces
except Exception as e:
st.error(f"OpenCV face detection error: {e}")
return img, np.zeros_like(img), []
def map_emotion_to_class(emotion):
positive = ['happiness', 'happy']
negative = ['disgust', 'sadness', 'fear', 'sad', 'angry', 'disgusted']
surprise = ['surprise']
others = ['repression', 'tense', 'neutral', 'others']
e = emotion.lower()
if any(p in e for p in positive):
return 'Positive'
elif any(n in e for n in negative):
return 'Negative'
elif any(s in e for s in surprise):
return 'Surprise'
else:
return 'Others'
if 'emotion_history' not in st.session_state:
st.session_state['emotion_history'] = []
class EmotionRecognitionProcessor(VideoProcessorBase):
def __init__(self):
self.last_class = None
self.rapid_change_count = 0
self.frame_count = 0
self.last_faces = []
self.last_img_gray = None
self.last_results = []
def recv(self, frame):
border_color = (255, 0, 0)
font_color = (0, 0, 255)
try:
img = frame.to_ndarray(format="bgr24")
self.frame_count += 1
# Only run detection every 5th frame, reuse previous results otherwise
if self.frame_count % 2 == 0:
img_disp, img_gray, faces = face_detect(img)
self.last_faces = faces
self.last_img_gray = img_gray
self.last_results = []
current_class = None
if len(faces) == 0:
cv2.putText(
img_disp, 'No Face Detect.', (2, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1
)
for (x, y, w, h) in faces:
x1, y1 = max(x - 10, 0), max(y - 10, 0)
x2 = min(x + w + 10, img_disp.shape[1])
y2 = min(y + h + 10, img_disp.shape[0])
face_img_gray = img_gray[y1:y2, x1:x2]
if face_img_gray.size == 0:
continue
face_img_pil = Image.fromarray(face_img_gray)
emotion, probability = classificator.detect_image(face_img_pil)
emotion_class = map_emotion_to_class(emotion)
self.last_results.append((x1, y1, x2, y2, emotion, probability, emotion_class))
current_class = emotion_class
if current_class:
history = st.session_state['emotion_history']
history.append(current_class)
if len(history) > 10:
history.pop(0)
if len(history) >= 3 and len(set(history[-3:])) > 1:
self.rapid_change_count += 1
else:
self.rapid_change_count = 0
else:
img_disp = img.copy()
img_gray = self.last_img_gray
faces = self.last_faces
for (x1, y1, x2, y2, emotion, probability, emotion_class) in self.last_results:
cv2.rectangle(
img_disp,
(x1, y1),
(x2, y2),
border_color,
thickness=2
)
cv2.putText(
img_disp, emotion, (x1 + 30, y1 - 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, font_color, 1
)
cv2.putText(
img_disp, str(round(probability, 3)), (x1 + 30, y1 - 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, font_color, 1
)
if len(faces) == 0:
cv2.putText(
img_disp, 'No Face Detect.', (2, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1
)
return frame.from_ndarray(img_disp, format="bgr24")
except Exception as e:
st.error(f"Error in video processing: {e}")
return frame
RTC_CONFIGURATION = RTCConfiguration({"iceServers": ICE_SERVERS})
webrtc_streamer(
key="emotion-detection",
video_processor_factory=EmotionRecognitionProcessor,
rtc_configuration=RTC_CONFIGURATION,
media_stream_constraints={"video": True, "audio": False},
)
history = st.session_state['emotion_history']
if len(history) >= 3 and len(set(history[-3:])) > 1:
st.warning(
"⚠️ Rapid changes in your detected emotional state were observed. "
"Micro-expressions may not always reflect your true feelings. "
"If you feel emotionally unstable or distressed, "
"consider reaching out to a mental health professional, "
"talking it over with a close person or taking a break."
)