Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
from typing import List | |
import av | |
import asyncio | |
from collections import deque | |
import threading | |
import cv2 | |
import numpy as np | |
import ray | |
from ray.util.queue import Queue | |
from app_interface_actor import AppInterfaceActor | |
import pydub | |
import torch | |
class StreamlitAVQueue: | |
def __init__(self, audio_bit_rate=16000): | |
self._output_channels = 2 | |
self._audio_bit_rate = audio_bit_rate | |
self._listening = True | |
self._looking = False | |
self._lock = threading.Lock() | |
self.app_interface_actor = AppInterfaceActor.get_singleton() | |
self._video_output_frame = None | |
def set_looking_listening(self, looking, listening: bool): | |
with self._lock: | |
self._looking = looking | |
self._listening = listening | |
async def queued_video_frames_callback( | |
self, | |
frames: List[av.VideoFrame], | |
) -> av.VideoFrame: | |
updated_frames = [] | |
try: | |
with self._lock: | |
should_look = self._looking | |
video_output_frames = await self.app_interface_actor.dequeue_video_output_frames_async.remote() | |
if len(video_output_frames) > 0: | |
self._video_output_frame = video_output_frames[-1] | |
for i, frame in enumerate(frames): | |
# supress the ffmpeg warning | |
saved_stderr_fd = os.dup(2) | |
stderr_fd = os.open(os.devnull, os.O_WRONLY) | |
os.dup2(stderr_fd, 2) | |
user_image = frame.to_ndarray(format="rgb24") | |
os.dup2(saved_stderr_fd, 2) | |
os.close(stderr_fd) | |
os.close(saved_stderr_fd) | |
if should_look: | |
shared_tensor_ref = ray.put(user_image) | |
await self.app_interface_actor.enqueue_video_input_frame.remote(shared_tensor_ref) | |
if self._video_output_frame is not None: | |
frame = self._video_output_frame | |
# resize user image to 1/4 size | |
user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA) | |
# flip horizontally | |
user_frame = cv2.flip(user_frame, 1) | |
x_user = 0 | |
y_user = frame.shape[0] - user_frame.shape[0] | |
final_frame = frame.copy() | |
final_frame[y_user:y_user+user_frame.shape[0], x_user:x_user+user_frame.shape[1]] = user_frame | |
frame = av.VideoFrame.from_ndarray(final_frame, format="rgb24") | |
updated_frames.append(frame) | |
# print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}") | |
except Exception as e: | |
print (e) | |
return updated_frames | |
async def queued_audio_frames_callback( | |
self, | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
try: | |
with self._lock: | |
should_listed = self._listening | |
sound_chunk = pydub.AudioSegment.empty() | |
if len(frames) > 0 and should_listed: | |
for frame in frames: | |
sound = pydub.AudioSegment( | |
data=frame.to_ndarray().tobytes(), | |
sample_width=frame.format.bytes, | |
frame_rate=frame.sample_rate, | |
channels=len(frame.layout.channels), | |
) | |
sound = sound.set_channels(1) | |
sound = sound.set_frame_rate(self._audio_bit_rate) | |
sound_chunk += sound | |
shared_buffer = np.array(sound_chunk.get_array_of_samples()) | |
shared_buffer_ref = ray.put(shared_buffer) | |
await self.app_interface_actor.enqueue_audio_input_frame.remote(shared_buffer_ref) | |
except Exception as e: | |
print (e) | |
# return empty frames to avoid echo | |
new_frames = [] | |
try: | |
for frame in frames: | |
required_samples = frame.samples | |
# print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}") | |
assert frame.format.bytes == 2 | |
assert frame.format.name == 's16' | |
import time | |
start_time = time.time() | |
frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote() | |
elapsed_time = time.time() - start_time | |
if elapsed_time > 0.1: | |
print (f"app_interface_actor.dequeue_audio_output_frame_async() elapsed_time: {elapsed_time}") | |
if frame_as_bytes: | |
# print(f"frame_as_bytes: {len(frame_as_bytes)}") | |
assert len(frame_as_bytes) == frame.samples * frame.format.bytes | |
samples = np.frombuffer(frame_as_bytes, dtype=np.int16) | |
else: | |
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16) | |
if self._output_channels == 2: | |
samples = np.vstack((samples, samples)).reshape((-1,), order='F') | |
samples = samples.reshape(1, -1) | |
layout = 'stereo' if self._output_channels == 2 else 'mono' | |
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) | |
new_frame.sample_rate = frame.sample_rate | |
new_frames.append(new_frame) | |
except Exception as e: | |
print (e) | |
return new_frames | |