Spaces:
Runtime error
Runtime error
from typing import List | |
import av | |
import asyncio | |
from collections import deque | |
import threading | |
import numpy as np | |
class StreamlitAVQueue: | |
def __init__(self): | |
self.audio_frames_deque_lock = threading.Lock() | |
self.audio_frames_deque: deque = deque([]) | |
self.video_frames_deque_lock = threading.Lock() | |
self.video_frames_deque: deque = deque([]) | |
async def queued_video_frames_callback( | |
self, | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
with self.video_frames_deque_lock: | |
self.video_frames_deque.extend(frames) | |
return frames | |
async def queued_audio_frames_callback( | |
self, | |
frames: List[av.AudioFrame], | |
) -> av.AudioFrame: | |
with self.audio_frames_deque_lock: | |
self.audio_frames_deque.extend(frames) | |
# return empty frames to avoid echo | |
new_frames = [] | |
for frame in frames: | |
input_array = frame.to_ndarray() | |
new_frame = av.AudioFrame.from_ndarray( | |
np.zeros(input_array.shape, dtype=input_array.dtype), | |
layout=frame.layout.name, | |
) | |
new_frame.sample_rate = frame.sample_rate | |
new_frames.append(new_frame) | |
return new_frames | |
def get_audio_frames(self) -> List[av.AudioFrame]: | |
audio_frames = [] | |
with self.audio_frames_deque_lock: | |
audio_frames = list(self.audio_frames_deque) | |
self.audio_frames_deque.clear() | |
return audio_frames | |
def get_video_frames(self) -> List[av.AudioFrame]: | |
video_frames = [] | |
with self.video_frames_deque_lock: | |
video_frames = list(self.video_frames_deque) | |
self.video_frames_deque.clear() | |
return video_frames | |