Spaces:
Sleeping
Sleeping
from typing import List | |
import av | |
import asyncio | |
from collections import deque | |
import threading | |
import numpy as np | |
import ray | |
from input_av_queue_actor import InputAVQueueActor | |
import pydub | |
import torch | |
class StreamlitAVQueue: | |
def __init__(self, audio_bit_rate=16000): | |
self._audio_bit_rate = audio_bit_rate | |
self.queue_actor = InputAVQueueActor.options( | |
name="InputAVQueueActor", | |
get_if_exists=True, | |
).remote() | |
async def queued_video_frames_callback( | |
self, | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
try: | |
for frame in frames: | |
shared_tensor = torch.from_numpy(frame.to_ndarray()) | |
shared_tensor_ref = ray.put(shared_tensor) | |
self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref) | |
except Exception as e: | |
print (e) | |
return frames | |
async def queued_audio_frames_callback( | |
self, | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
try: | |
sound_chunk = pydub.AudioSegment.empty() | |
if len(frames) > 0: | |
for frame in frames: | |
sound = pydub.AudioSegment( | |
data=frame.to_ndarray().tobytes(), | |
sample_width=frame.format.bytes, | |
frame_rate=frame.sample_rate, | |
channels=len(frame.layout.channels), | |
) | |
sound = sound.set_channels(1) | |
sound = sound.set_frame_rate(self._audio_bit_rate) | |
sound_chunk += sound | |
shared_buffer = np.array(sound_chunk.get_array_of_samples()) | |
shared_buffer_ref = ray.put(shared_buffer) | |
self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref) | |
except Exception as e: | |
print (e) | |
# return empty frames to avoid echo | |
new_frames = [] | |
for frame in frames: | |
input_array = frame.to_ndarray() | |
new_frame = av.AudioFrame.from_ndarray( | |
np.zeros(input_array.shape, dtype=input_array.dtype), | |
layout=frame.layout.name, | |
) | |
new_frame.sample_rate = frame.sample_rate | |
new_frames.append(new_frame) | |
return new_frames | |
async def get_audio_frames_async(self) -> List[av.AudioFrame]: | |
shared_buffers = await self.queue_actor.get_audio_frames.remote() | |
return shared_buffers | |
async def get_video_frames_async(self) -> List[av.AudioFrame]: | |
shared_tensors = await self.queue_actor.get_video_frames.remote() | |
return shared_tensors |