Spaces:
Sleeping
Sleeping
import ray | |
from ray.util.queue import Queue | |
from ray.actor import ActorHandle | |
import torch | |
import numpy as np | |
class AppInterfaceActor: | |
def __init__(self): | |
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed | |
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed | |
self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed | |
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed | |
def get_singleton(): | |
return AppInterfaceActor.options( | |
name="AppInterfaceActor", | |
get_if_exists=True, | |
).remote() | |
# functions for UI to enqueue input, dequeue output | |
async def enqueue_video_input_frame(self, shared_tensor_ref): | |
if self.video_input_queue.full(): | |
evicted_item = await self.video_input_queue.get_async() | |
del evicted_item | |
await self.video_input_queue.put_async(shared_tensor_ref) | |
async def enqueue_audio_input_frame(self, shared_buffer_ref): | |
if self.audio_input_queue.full(): | |
evicted_item = await self.audio_input_queue.get_async() | |
del evicted_item | |
await self.audio_input_queue.put_async(shared_buffer_ref) | |
async def dequeue_audio_output_frame_async(self): | |
if self.audio_output_queue.empty(): | |
return None | |
frame = await self.audio_output_queue.get_async() | |
return frame | |
async def dequeue_video_output_frames_async(self): | |
video_frames = [] | |
if self.video_output_queue.empty(): | |
return video_frames | |
while not self.video_output_queue.empty(): | |
shared_tensor = await self.video_output_queue.get_async() | |
video_frames.append(shared_tensor) | |
return video_frames | |
# functions for application to dequeue input, enqueue output | |
def get_audio_output_queue(self)->Queue: | |
return self.audio_output_queue | |
async def enqueue_video_output_frame(self, shared_tensor_ref): | |
if self.video_output_queue.full(): | |
evicted_item = await self.video_output_queue.get_async() | |
del evicted_item | |
await self.video_output_queue.put_async(shared_tensor_ref) | |
async def dequeue_audio_input_frames_async(self): | |
audio_frames = [] | |
if self.audio_input_queue.empty(): | |
return audio_frames | |
while not self.audio_input_queue.empty(): | |
shared_tensor = await self.audio_input_queue.get_async() | |
audio_frames.append(shared_tensor) | |
return audio_frames | |
async def dequeue_video_input_frames_async(self): | |
video_frames = [] | |
if self.video_input_queue.empty(): | |
return video_frames | |
while not self.video_input_queue.empty(): | |
shared_tensor = await self.video_input_queue.get_async() | |
video_frames.append(shared_tensor) | |
return video_frames |