project_charles / streamlit_av_queue.py
sohojoe's picture
fix use of ray.get inside aync
768e92a
raw
history blame
2.77 kB
from typing import List
import av
import asyncio
from collections import deque
import threading
import numpy as np
import ray
from input_av_queue_actor import InputAVQueueActor
import pydub
import torch
class StreamlitAVQueue:
def __init__(self, audio_bit_rate=16000):
self._audio_bit_rate = audio_bit_rate
self.queue_actor = InputAVQueueActor.options(
name="InputAVQueueActor",
get_if_exists=True,
).remote()
async def queued_video_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
for frame in frames:
shared_tensor = torch.from_numpy(frame.to_ndarray())
shared_tensor_ref = ray.put(shared_tensor)
self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref)
except Exception as e:
print (e)
return frames
async def queued_audio_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
sound_chunk = pydub.AudioSegment.empty()
if len(frames) > 0:
for frame in frames:
sound = pydub.AudioSegment(
data=frame.to_ndarray().tobytes(),
sample_width=frame.format.bytes,
frame_rate=frame.sample_rate,
channels=len(frame.layout.channels),
)
sound = sound.set_channels(1)
sound = sound.set_frame_rate(self._audio_bit_rate)
sound_chunk += sound
shared_buffer = np.array(sound_chunk.get_array_of_samples())
shared_buffer_ref = ray.put(shared_buffer)
self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref)
except Exception as e:
print (e)
# return empty frames to avoid echo
new_frames = []
for frame in frames:
input_array = frame.to_ndarray()
new_frame = av.AudioFrame.from_ndarray(
np.zeros(input_array.shape, dtype=input_array.dtype),
layout=frame.layout.name,
)
new_frame.sample_rate = frame.sample_rate
new_frames.append(new_frame)
return new_frames
async def get_audio_frames_async(self) -> List[av.AudioFrame]:
shared_buffers = await self.queue_actor.get_audio_frames.remote()
return shared_buffers
async def get_video_frames_async(self) -> List[av.AudioFrame]:
shared_tensors = await self.queue_actor.get_video_frames.remote()
return shared_tensors