import time import ray from ray.util.queue import Queue, Empty from ray.actor import ActorHandle import numpy as np import pid_helper # Ray Queue's take ~.5 seconds to splin up; # this class creates a pool of queues to cycle through class QueueFactory: def __init__(self, max_size:int): self.queues:[Queue] = [] self.queue_size = 5 self.max_size = max_size while len(self.queues) < self.queue_size: self.queues.append(Queue(maxsize=max_size)) def get_queue(self)->Queue: queue = self.queues.pop(0) self.queues.append(Queue(maxsize=self.max_size)) return queue @ray.remote class AppInterfaceActor: def __init__(self): self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed self.audio_output_queue_factory = QueueFactory(max_size=50) self.audio_output_queue = self.audio_output_queue_factory.get_queue() self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed self.debug_str = "" self.state = "Initializing" self.charles_app_pids = [] @staticmethod def get_singleton(): return AppInterfaceActor.options( name="AppInterfaceActor", get_if_exists=True, ).remote() # functions for UI to enqueue input, dequeue output async def enqueue_video_input_frame(self, shared_tensor_ref): if self.video_input_queue.full(): evicted_item = await self.video_input_queue.get_async() del evicted_item await self.video_input_queue.put_async(shared_tensor_ref) async def enqueue_audio_input_frame(self, shared_buffer_ref): if self.audio_input_queue.full(): evicted_item = await self.audio_input_queue.get_async() del evicted_item await self.audio_input_queue.put_async(shared_buffer_ref) async def dequeue_audio_output_frame_async(self): start_time = time.time() try: frame = await self.audio_output_queue.get_async(block=False) except Empty: frame = None elapsed_time = time.time() - start_time if elapsed_time > 0.1: print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}") return frame async def dequeue_video_output_frames_async(self): video_frames = [] if self.video_output_queue.empty(): return video_frames while not self.video_output_queue.empty(): shared_tensor = await self.video_output_queue.get_async() video_frames.append(shared_tensor) return video_frames # functions for application to dequeue input, enqueue output def get_audio_output_queue(self)->Queue: return self.audio_output_queue async def cycle_output_queue(self)->Queue: self.audio_output_queue.shutdown(grace_period_s=0.1) self.audio_output_queue = self.audio_output_queue_factory.get_queue() return self.audio_output_queue async def enqueue_video_output_frame(self, shared_tensor_ref): if self.video_output_queue.full(): evicted_item = await self.video_output_queue.get_async() del evicted_item await self.video_output_queue.put_async(shared_tensor_ref) async def dequeue_audio_input_frames_async(self): audio_frames = [] if self.audio_input_queue.empty(): return audio_frames while not self.audio_input_queue.empty(): shared_tensor = await self.audio_input_queue.get_async() audio_frames.append(shared_tensor) return audio_frames async def dequeue_video_input_frames_async(self): video_frames = [] if self.video_input_queue.empty(): return video_frames while not self.video_input_queue.empty(): shared_tensor = await self.video_input_queue.get_async() video_frames.append(shared_tensor) return video_frames # pid helpers async def add_charles_app_pid(self, pid:int): self.charles_app_pids.append(pid) async def get_charles_app_pids(self)->[int]: # prune dead pids running_charles_app_pids = [] for pid in self.charles_app_pids: if pid_helper.is_pid_running(pid): running_charles_app_pids.append(pid) self.charles_app_pids = running_charles_app_pids return self.charles_app_pids async def is_charles_app_running(self)->bool: # prune dead pids running_charles_app_pids = [] for pid in self.charles_app_pids: if pid_helper.is_pid_running(pid): running_charles_app_pids.append(pid) self.charles_app_pids = running_charles_app_pids return len(self.charles_app_pids) > 0 # debug helpers async def get_debug_output(self)->str: return self.debug_str async def set_debug_output(self, debug_str:str): self.debug_str = debug_str async def get_state(self)->str: return self.state async def set_state(self, state:str): self.state = state