from typing import List import av import asyncio from collections import deque import threading import cv2 import numpy as np import ray from ray.util.queue import Queue from webrtc_av_queue_actor import WebRtcAVQueueActor import pydub import torch class StreamlitAVQueue: def __init__(self, audio_bit_rate=16000): self._output_channels = 2 self._audio_bit_rate = audio_bit_rate self._listening = True self._looking = False self._lock = threading.Lock() self.queue_actor = WebRtcAVQueueActor.options( name="WebRtcAVQueueActor", get_if_exists=True, ).remote() self._out_video_frame = None def set_looking_listening(self, looking, listening: bool): with self._lock: self._looking = looking self._listening = listening async def queued_video_frames_callback( self, frames: List[av.VideoFrame], ) -> av.VideoFrame: updated_frames = [] try: with self._lock: should_look = self._looking next_out_video_frame = await self.queue_actor.get_out_video_frame.remote() if next_out_video_frame is not None: self._out_video_frame = next_out_video_frame for i, frame in enumerate(frames): user_image = frame.to_ndarray(format="rgb24") if should_look: shared_tensor_ref = ray.put(user_image) await self.queue_actor.enqueue_in_video_frame.remote(shared_tensor_ref) if self._out_video_frame is not None: frame = self._out_video_frame # resize user image to 1/4 size user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA) # flip horizontally user_frame = cv2.flip(user_frame, 1) x_user = 0 y_user = frame.shape[0] - user_frame.shape[0] final_frame = frame.copy() final_frame[y_user:y_user+user_frame.shape[0], x_user:x_user+user_frame.shape[1]] = user_frame frame = av.VideoFrame.from_ndarray(final_frame, format="rgb24") updated_frames.append(frame) # print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}") except Exception as e: print (e) return updated_frames async def queued_audio_frames_callback( self, frames: List[av.AudioFrame], ) -> av.AudioFrame: try: with self._lock: should_listed = self._listening sound_chunk = pydub.AudioSegment.empty() if len(frames) > 0 and should_listed: for frame in frames: sound = pydub.AudioSegment( data=frame.to_ndarray().tobytes(), sample_width=frame.format.bytes, frame_rate=frame.sample_rate, channels=len(frame.layout.channels), ) sound = sound.set_channels(1) sound = sound.set_frame_rate(self._audio_bit_rate) sound_chunk += sound shared_buffer = np.array(sound_chunk.get_array_of_samples()) shared_buffer_ref = ray.put(shared_buffer) await self.queue_actor.enqueue_in_audio_frame.remote(shared_buffer_ref) except Exception as e: print (e) # return empty frames to avoid echo new_frames = [] try: for frame in frames: required_samples = frame.samples # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}") assert frame.format.bytes == 2 assert frame.format.name == 's16' frame_as_bytes = await self.queue_actor.get_out_audio_frame.remote() if frame_as_bytes: # print(f"frame_as_bytes: {len(frame_as_bytes)}") assert len(frame_as_bytes) == frame.samples * frame.format.bytes samples = np.frombuffer(frame_as_bytes, dtype=np.int16) else: samples = np.zeros((required_samples * 2 * 1), dtype=np.int16) if self._output_channels == 2: samples = np.vstack((samples, samples)).reshape((-1,), order='F') samples = samples.reshape(1, -1) layout = 'stereo' if self._output_channels == 2 else 'mono' new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) new_frame.sample_rate = frame.sample_rate new_frames.append(new_frame) except Exception as e: print (e) return new_frames async def get_in_audio_frames_async(self) -> List[av.AudioFrame]: shared_buffers = await self.queue_actor.get_in_audio_frames.remote() return shared_buffers async def get_video_frames_async(self) -> List[av.AudioFrame]: shared_tensors = await self.queue_actor.get_in_video_frames.remote() return shared_tensors def get_out_audio_queue(self)->Queue: return self.queue_actor.get_out_audio_queue.remote() def get_out_video_queue(self)->Queue: return self.queue_actor.get_out_video_queue.remote()