project_charles / streamlit_av_queue.py
sohojoe's picture
Charles running between two applications
ad67495
raw
history blame
2.76 kB
from typing import List
import av
import asyncio
from collections import deque
import threading
import numpy as np
import ray
from input_av_queue_actor import InputAVQueueActor
import pydub
import torch
class StreamlitAVQueue:
def __init__(self, audio_bit_rate=16000):
self._audio_bit_rate = audio_bit_rate
self.queue_actor = InputAVQueueActor.options(
name="InputAVQueueActor",
get_if_exists=True,
).remote()
async def queued_video_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
for frame in frames:
shared_tensor = torch.from_numpy(frame.to_ndarray())
shared_tensor_ref = ray.put(shared_tensor)
self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref)
except Exception as e:
print (e)
return frames
async def queued_audio_frames_callback(
self,
frames: List[av.AudioFrame],
) -> av.AudioFrame:
try:
sound_chunk = pydub.AudioSegment.empty()
if len(frames) > 0:
for frame in frames:
sound = pydub.AudioSegment(
data=frame.to_ndarray().tobytes(),
sample_width=frame.format.bytes,
frame_rate=frame.sample_rate,
channels=len(frame.layout.channels),
)
sound = sound.set_channels(1)
sound = sound.set_frame_rate(self._audio_bit_rate)
sound_chunk += sound
shared_buffer = np.array(sound_chunk.get_array_of_samples())
shared_buffer_ref = ray.put(shared_buffer)
self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref)
except Exception as e:
print (e)
# return empty frames to avoid echo
new_frames = []
for frame in frames:
input_array = frame.to_ndarray()
new_frame = av.AudioFrame.from_ndarray(
np.zeros(input_array.shape, dtype=input_array.dtype),
layout=frame.layout.name,
)
new_frame.sample_rate = frame.sample_rate
new_frames.append(new_frame)
return new_frames
def get_audio_frames(self) -> List[av.AudioFrame]:
shared_buffers = ray.get(self.queue_actor.get_audio_frames.remote())
return shared_buffers
def get_video_frames(self) -> List[av.AudioFrame]:
shared_tensors = ray.get(self.queue_actor.get_video_frames.remote())
return shared_tensors