Spaces:
Sleeping
Sleeping
import ray | |
from ray.util.queue import Queue | |
from ray.actor import ActorHandle | |
import torch | |
import numpy as np | |
class WebRtcAVQueueActor: | |
def __init__(self): | |
self.in_audio_queue = Queue(maxsize=100) # Adjust the size as needed | |
self.in_video_queue = Queue(maxsize=100) # Adjust the size as needed | |
self.out_audio_queue = Queue(maxsize=100) # Adjust the size as needed | |
async def enqueue_in_video_frame(self, shared_tensor_ref): | |
if self.in_video_queue.full(): | |
evicted_item = await self.in_video_queue.get_async() | |
del evicted_item | |
await self.in_video_queue.put_async(shared_tensor_ref) | |
async def enqueue_in_audio_frame(self, shared_buffer_ref): | |
if self.in_audio_queue.full(): | |
evicted_item = await self.in_audio_queue.get_async() | |
del evicted_item | |
await self.in_audio_queue.put_async(shared_buffer_ref) | |
async def get_in_audio_frames(self): | |
audio_frames = [] | |
if self.in_audio_queue.empty(): | |
return audio_frames | |
while not self.in_audio_queue.empty(): | |
shared_tensor_ref = await self.in_audio_queue.get_async() | |
audio_frames.append(shared_tensor_ref) | |
return audio_frames | |
async def get_in_video_frames(self): | |
video_frames = [] | |
if self.in_video_queue.empty(): | |
return video_frames | |
while not self.in_video_queue.empty(): | |
shared_tensor_ref = await self.in_video_queue.get_async() | |
video_frames.append(shared_tensor_ref) | |
return video_frames | |
def get_out_audio_queue(self): | |
return self.out_audio_queue | |
async def get_out_audio_frame(self): | |
if self.out_audio_queue.empty(): | |
return None | |
audio_frame = await self.out_audio_queue.get_async() | |
return audio_frame | |