Spaces:
Sleeping
Sleeping
from collections import deque | |
import os | |
import threading | |
import time | |
import av | |
import numpy as np | |
import streamlit as st | |
from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
import pydub | |
import torch | |
# import av | |
# import cv2 | |
from sample_utils.turn import get_ice_servers | |
import json | |
from typing import List | |
from vosk import SetLogLevel, Model, KaldiRecognizer | |
SetLogLevel(-1) # mutes vosk verbosity | |
from dotenv import load_dotenv | |
load_dotenv() | |
system_one = { | |
"audio_bit_rate": 16000, | |
# "audio_bit_rate": 32000, | |
# "audio_bit_rate": 48000, | |
# "vision_embeddings_fps": 5, | |
"vision_embeddings_fps": 2, | |
} | |
system_one["video_detection_emotions"] = [ | |
"a happy person", | |
"the person is happy", | |
"the person's emotional state is happy", | |
"a sad person", | |
"a scared person", | |
"a disgusted person", | |
"an angry person", | |
"a suprised person", | |
"a bored person", | |
"an interested person", | |
"a guilty person", | |
"an indiffert person", | |
"a distracted person", | |
] | |
# system_one["video_detection_emotions"] = [ | |
# "Happiness", | |
# "Sadness", | |
# "Fear", | |
# "Disgust", | |
# "Anger", | |
# "Surprise", | |
# "Boredom", | |
# "Interest", | |
# "Excitement", | |
# "Guilt", | |
# "Shame", | |
# "Relief", | |
# "Love", | |
# "Embarrassment", | |
# "Pride", | |
# "Envy", | |
# "Jealousy", | |
# "Anxiety", | |
# "Hope", | |
# "Despair", | |
# "Frustration", | |
# "Confusion", | |
# "Curiosity", | |
# "Contentment", | |
# "Indifference", | |
# "Anticipation", | |
# "Gratitude", | |
# "Bitterness" | |
# ] | |
system_one["video_detection_engement"] = [ | |
"the person is engaged in the conversation", | |
"the person is not engaged in the conversation", | |
"the person is looking at me", | |
"the person is not looking at me", | |
"the person is talking to me", | |
"the person is not talking to me", | |
"the person is engaged", | |
"the person is talking", | |
"the person is listening", | |
] | |
system_one["video_detection_present"] = [ | |
"the view from a webcam", | |
"the view from a webcam we see a person", | |
# "the view from a webcam. I see a person", | |
# "the view from a webcam. The person is looking at the camera", | |
# "i am a webcam", | |
# "i am a webcam and i see a person", | |
# "i am a webcam and i see a person. The person is looking at me", | |
# "a person", | |
# "a person on a Zoom call", | |
# "a person on a FaceTime call", | |
# "a person on a WebCam call", | |
# "no one", | |
# " ", | |
# "multiple people", | |
# "a group of people", | |
] | |
system_one_audio_status = st.empty() | |
playing = st.checkbox("Playing", value=True) | |
def load_vosk (model='small'): | |
# load vosk model | |
# get path of current file | |
current_file_path = os.path.abspath(__file__) | |
current_directory = os.path.dirname(current_file_path) | |
_path = os.path.join(current_directory, 'models', 'vosk', model) | |
model_voice = Model(_path) | |
recognizer = KaldiRecognizer(model_voice, system_one['audio_bit_rate']) | |
return recognizer | |
vask = load_vosk() | |
def handle_audio_frame(frame): | |
# if self.vosk.AcceptWaveform(data): | |
pass | |
def do_work(data: bytearray) -> tuple[str, bool]: | |
text = '' | |
speaker_finished = False | |
if vask.AcceptWaveform(data): | |
result = vask.Result() | |
result_json = json.loads(result) | |
text = result_json['text'] | |
speaker_finished = True | |
else: | |
result = vask.PartialResult() | |
result_json = json.loads(result) | |
text = result_json['partial'] | |
return text, speaker_finished | |
audio_frames_deque_lock = threading.Lock() | |
audio_frames_deque: deque = deque([]) | |
video_frames_deque_lock = threading.Lock() | |
video_frames_deque: deque = deque([]) | |
async def queued_video_frames_callback( | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
with video_frames_deque_lock: | |
video_frames_deque.extend(frames) | |
return frames | |
async def queued_audio_frames_callback( | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
with audio_frames_deque_lock: | |
audio_frames_deque.extend(frames) | |
# create frames to be returned. | |
new_frames = [] | |
for frame in frames: | |
input_array = frame.to_ndarray() | |
new_frame = av.AudioFrame.from_ndarray( | |
np.zeros(input_array.shape, dtype=input_array.dtype), | |
layout=frame.layout.name, | |
) | |
new_frame.sample_rate = frame.sample_rate | |
new_frames.append(new_frame) | |
# TODO: replace with the audio we want to send to the other side. | |
return new_frames | |
system_one_audio_status.write("Initializing CLIP model") | |
from clip_transform import CLIPTransform | |
clip_transform = CLIPTransform() | |
system_one_audio_status.write("Initializing CLIP templates") | |
embeddings = clip_transform.text_to_embeddings(system_one["video_detection_emotions"]) | |
system_one["video_detection_emotions_embeddings"] = embeddings | |
embeddings = clip_transform.text_to_embeddings(system_one["video_detection_engement"]) | |
system_one["video_detection_engement_embeddings"] = embeddings | |
embeddings = clip_transform.text_to_embeddings(system_one["video_detection_present"]) | |
system_one["video_detection_present_embeddings"] = embeddings | |
system_one_audio_status.write("Initializing webrtc_streamer") | |
webrtc_ctx = webrtc_streamer( | |
key="charles", | |
desired_playing_state=playing, | |
# audio_receiver_size=4096, | |
queued_audio_frames_callback=queued_audio_frames_callback, | |
queued_video_frames_callback=queued_video_frames_callback, | |
mode=WebRtcMode.SENDRECV, | |
rtc_configuration={"iceServers": get_ice_servers()}, | |
async_processing=True, | |
) | |
if not webrtc_ctx.state.playing: | |
exit | |
system_one_audio_status.write("Initializing streaming") | |
system_one_audio_output = st.empty() | |
system_one_video_output = st.empty() | |
system_one_audio_history = [] | |
system_one_audio_history_output = st.empty() | |
sound_chunk = pydub.AudioSegment.empty() | |
current_video_embedding = None | |
current_video_embedding_timestamp = time.monotonic() | |
def get_dot_similarities(video_embedding, embeddings, embeddings_labels): | |
dot_product = torch.mm(embeddings, video_embedding.T) | |
similarity_image_label = [(float("{:.4f}".format(dot_product[i][0])), embeddings_labels[i]) for i in range(len(embeddings_labels))] | |
similarity_image_label.sort(reverse=True) | |
return similarity_image_label | |
def get_top_3_similarities_as_a_string(video_embedding, embeddings, embeddings_labels): | |
similarities = get_dot_similarities(video_embedding, embeddings, embeddings_labels) | |
top_3 = "" | |
range_len = 3 if len(similarities) > 3 else len(similarities) | |
for i in range(range_len): | |
top_3 += f"{similarities[i][1]} ({similarities[i][0]}) " | |
return top_3 | |
while True: | |
if webrtc_ctx.state.playing: | |
# handle video | |
video_frames = [] | |
with video_frames_deque_lock: | |
while len(video_frames_deque) > 0: | |
frame = video_frames_deque.popleft() | |
video_frames.append(frame) | |
get_embeddings = False | |
get_embeddings |= current_video_embedding is None | |
current_time = time.monotonic() | |
elapsed_time = current_time - current_video_embedding_timestamp | |
get_embeddings |= elapsed_time > 1. / system_one['vision_embeddings_fps'] | |
if get_embeddings and len(video_frames) > 0: | |
current_video_embedding_timestamp = current_time | |
current_video_embedding = clip_transform.image_to_embeddings(video_frames[-1].to_ndarray()) | |
emotions_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_emotions_embeddings"], system_one["video_detection_emotions"]) | |
engagement_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_engement_embeddings"], system_one["video_detection_engement"]) | |
present_top_3 = get_top_3_similarities_as_a_string(current_video_embedding, system_one["video_detection_present_embeddings"], system_one["video_detection_present"]) | |
# table_content = "**System 1 Video:**\n\n" | |
table_content = "| System 1 Video | |\n| --- | --- |\n" | |
table_content += f"| Present | {present_top_3} |\n" | |
table_content += f"| Emotion | {emotions_top_3} |\n" | |
table_content += f"| Engagement | {engagement_top_3} |\n" | |
system_one_video_output.markdown(table_content) | |
# system_one_video_output.markdown(f"**System 1 Video:** \n [Emotion: {emotions_top_3}], \n [Engagement: {engagement_top_3}], \n [Present: {present_top_3}] ") | |
# for similarity, image_label in similarity_image_label: | |
# print (f"{similarity} {image_label}") | |
# handle audio | |
audio_frames = [] | |
with audio_frames_deque_lock: | |
while len(audio_frames_deque) > 0: | |
frame = audio_frames_deque.popleft() | |
audio_frames.append(frame) | |
if len(audio_frames) == 0: | |
time.sleep(0.1) | |
system_one_audio_status.write("No frame arrived.") | |
continue | |
system_one_audio_status.write("Running. Say something!") | |
for audio_frame in audio_frames: | |
sound = pydub.AudioSegment( | |
data=audio_frame.to_ndarray().tobytes(), | |
sample_width=audio_frame.format.bytes, | |
frame_rate=audio_frame.sample_rate, | |
channels=len(audio_frame.layout.channels), | |
) | |
sound = sound.set_channels(1) | |
sound = sound.set_frame_rate(system_one['audio_bit_rate']) | |
sound_chunk += sound | |
if len(sound_chunk) > 0: | |
buffer = np.array(sound_chunk.get_array_of_samples()) | |
text, speaker_finished = do_work(buffer.tobytes()) | |
system_one_audio_output.markdown(f"**System 1 Audio:** {text}") | |
if speaker_finished and len(text) > 0: | |
system_one_audio_history.append(text) | |
if len(system_one_audio_history) > 10: | |
system_one_audio_history = system_one_audio_history[-10:] | |
table_content = "| System 1 Audio History |\n| --- |\n" | |
table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) | |
system_one_audio_history_output.markdown(table_content) | |
sound_chunk = pydub.AudioSegment.empty() | |
else: | |
system_one_audio_status.write("Stopped.") | |
break | |