Spaces:
Runtime error
Runtime error
File size: 5,747 Bytes
4904f3e 162d5c8 cf5e7f4 162d5c8 62a21bd cf5e7f4 72e4889 62a21bd 162d5c8 62a21bd d91a673 62a21bd e2846c4 09ede70 e2846c4 72e4889 aec6f97 e2846c4 149eeaf e2846c4 149eeaf e2846c4 ad67495 162d5c8 9ed41df cf5e7f4 62a21bd 149eeaf 28b5e08 cf5e7f4 4904f3e cf5e7f4 4904f3e cf5e7f4 72e4889 aec6f97 cf5e7f4 7925882 cf5e7f4 9ed41df 62a21bd cf5e7f4 162d5c8 62a21bd e2846c4 62a21bd e2846c4 62a21bd 72e4889 62a21bd 162d5c8 d91a673 a642a9f 28b5e08 a642a9f d91a673 162d5c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
from typing import List
import av
import asyncio
from collections import deque
import threading
import cv2
import numpy as np
import ray
from ray.util.queue import Queue
from app_interface_actor import AppInterfaceActor
import pydub
import torch
class StreamlitAVQueue:
def __init__(self, audio_bit_rate=16000):
self._output_channels = 2
self._audio_bit_rate = audio_bit_rate
self._listening = True
self._looking = False
self._lock = threading.Lock()
self.app_interface_actor = AppInterfaceActor.get_singleton()
self._video_output_frame = None
def set_looking_listening(self, looking, listening: bool):
with self._lock:
self._looking = looking
self._listening = listening
async def queued_video_frames_callback(
self,
frames: List[av.VideoFrame],
) -> av.VideoFrame:
updated_frames = []
try:
with self._lock:
should_look = self._looking
video_output_frames = await self.app_interface_actor.dequeue_video_output_frames_async.remote()
if len(video_output_frames) > 0:
self._video_output_frame = video_output_frames[-1]
for i, frame in enumerate(frames):
# supress the ffmpeg warning
saved_stderr_fd = os.dup(2)
stderr_fd = os.open(os.devnull, os.O_WRONLY)
os.dup2(stderr_fd, 2)
user_image = frame.to_ndarray(format="rgb24")
os.dup2(saved_stderr_fd, 2)
os.close(stderr_fd)
os.close(saved_stderr_fd)
if should_look:
shared_tensor_ref = ray.put(user_image)
await self.app_interface_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
if self._video_output_frame is not None:
frame = self._video_output_frame
# resize user image to 1/4 size
user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA)
# flip horizontally
user_frame = cv2.flip(user_frame, 1)
x_user = 0
y_user = frame.shape[0] - user_frame.shape[0]
final_frame = frame.copy()
final_frame[y_user:y_user+user_frame.shape[0], x_user:x_user+user_frame.shape[1]] = user_frame
frame = av.VideoFrame.from_ndarray(final_frame, format="rgb24")
updated_frames.append(frame)
# print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}")
except Exception as e:
print (e)
return updated_frames
async def queued_audio_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
with self._lock:
should_listed = self._listening
sound_chunk = pydub.AudioSegment.empty()
if len(frames) > 0 and should_listed:
for frame in frames:
sound = pydub.AudioSegment(
data=frame.to_ndarray().tobytes(),
sample_width=frame.format.bytes,
frame_rate=frame.sample_rate,
channels=len(frame.layout.channels),
)
sound = sound.set_channels(1)
sound = sound.set_frame_rate(self._audio_bit_rate)
sound_chunk += sound
shared_buffer = np.array(sound_chunk.get_array_of_samples())
shared_buffer_ref = ray.put(shared_buffer)
await self.app_interface_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
except Exception as e:
print (e)
# return empty frames to avoid echo
new_frames = []
try:
for frame in frames:
required_samples = frame.samples
# print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
assert frame.format.bytes == 2
assert frame.format.name == 's16'
import time
start_time = time.time()
frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote()
elapsed_time = time.time() - start_time
if elapsed_time > 0.1:
print (f"app_interface_actor.dequeue_audio_output_frame_async() elapsed_time: {elapsed_time}")
if frame_as_bytes:
# print(f"frame_as_bytes: {len(frame_as_bytes)}")
assert len(frame_as_bytes) == frame.samples * frame.format.bytes
samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
else:
samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
if self._output_channels == 2:
samples = np.vstack((samples, samples)).reshape((-1,), order='F')
samples = samples.reshape(1, -1)
layout = 'stereo' if self._output_channels == 2 else 'mono'
new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
new_frame.sample_rate = frame.sample_rate
new_frames.append(new_frame)
except Exception as e:
print (e)
return new_frames
|