from typing import List import av import asyncio from collections import deque import threading import numpy as np import ray from input_av_queue_actor import InputAVQueueActor import pydub import torch class StreamlitAVQueue: def __init__(self, audio_bit_rate=16000): self._audio_bit_rate = audio_bit_rate self.queue_actor = InputAVQueueActor.options( name="InputAVQueueActor", get_if_exists=True, ).remote() async def queued_video_frames_callback( self, frames: List[av.AudioFrame], ) -> av.AudioFrame: try: for frame in frames: shared_tensor = torch.from_numpy(frame.to_ndarray()) shared_tensor_ref = ray.put(shared_tensor) self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref) except Exception as e: print (e) return frames async def queued_audio_frames_callback( self, frames: List[av.AudioFrame], ) -> av.AudioFrame: try: sound_chunk = pydub.AudioSegment.empty() if len(frames) > 0: for frame in frames: sound = pydub.AudioSegment( data=frame.to_ndarray().tobytes(), sample_width=frame.format.bytes, frame_rate=frame.sample_rate, channels=len(frame.layout.channels), ) sound = sound.set_channels(1) sound = sound.set_frame_rate(self._audio_bit_rate) sound_chunk += sound shared_buffer = np.array(sound_chunk.get_array_of_samples()) shared_buffer_ref = ray.put(shared_buffer) self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref) except Exception as e: print (e) # return empty frames to avoid echo new_frames = [] for frame in frames: input_array = frame.to_ndarray() new_frame = av.AudioFrame.from_ndarray( np.zeros(input_array.shape, dtype=input_array.dtype), layout=frame.layout.name, ) new_frame.sample_rate = frame.sample_rate new_frames.append(new_frame) return new_frames async def get_audio_frames_async(self) -> List[av.AudioFrame]: shared_buffers = await self.queue_actor.get_audio_frames.remote() return shared_buffers async def get_video_frames_async(self) -> List[av.AudioFrame]: shared_tensors = await self.queue_actor.get_video_frames.remote() return shared_tensors