import ray from ray.util.queue import Queue from ray.actor import ActorHandle import torch import numpy as np import pid_helper @ray.remote class AppInterfaceActor: def __init__(self): self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed self.audio_output_queue = Queue(maxsize=50) # Adjust the size as needed self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed self.debug_str = "" self.state = "Initializing" self.charles_app_pids = [] @staticmethod def get_singleton(): return AppInterfaceActor.options( name="AppInterfaceActor", get_if_exists=True, ).remote() # functions for UI to enqueue input, dequeue output async def enqueue_video_input_frame(self, shared_tensor_ref): if self.video_input_queue.full(): evicted_item = await self.video_input_queue.get_async() del evicted_item await self.video_input_queue.put_async(shared_tensor_ref) async def enqueue_audio_input_frame(self, shared_buffer_ref): if self.audio_input_queue.full(): evicted_item = await self.audio_input_queue.get_async() del evicted_item await self.audio_input_queue.put_async(shared_buffer_ref) async def dequeue_audio_output_frame_async(self): if self.audio_output_queue.empty(): return None frame = await self.audio_output_queue.get_async() return frame async def dequeue_video_output_frames_async(self): video_frames = [] if self.video_output_queue.empty(): return video_frames while not self.video_output_queue.empty(): shared_tensor = await self.video_output_queue.get_async() video_frames.append(shared_tensor) return video_frames # functions for application to dequeue input, enqueue output def get_audio_output_queue(self)->Queue: return self.audio_output_queue async def enqueue_video_output_frame(self, shared_tensor_ref): if self.video_output_queue.full(): evicted_item = await self.video_output_queue.get_async() del evicted_item await self.video_output_queue.put_async(shared_tensor_ref) async def dequeue_audio_input_frames_async(self): audio_frames = [] if self.audio_input_queue.empty(): return audio_frames while not self.audio_input_queue.empty(): shared_tensor = await self.audio_input_queue.get_async() audio_frames.append(shared_tensor) return audio_frames async def dequeue_video_input_frames_async(self): video_frames = [] if self.video_input_queue.empty(): return video_frames while not self.video_input_queue.empty(): shared_tensor = await self.video_input_queue.get_async() video_frames.append(shared_tensor) return video_frames # pid helpers async def add_charles_app_pid(self, pid:int): self.charles_app_pids.append(pid) async def get_charles_app_pids(self)->[int]: # prune dead pids running_charles_app_pids = [] for pid in self.charles_app_pids: if pid_helper.is_pid_running(pid): running_charles_app_pids.append(pid) self.charles_app_pids = running_charles_app_pids return self.charles_app_pids async def is_charles_app_running(self)->bool: # prune dead pids running_charles_app_pids = [] for pid in self.charles_app_pids: if pid_helper.is_pid_running(pid): running_charles_app_pids.append(pid) self.charles_app_pids = running_charles_app_pids return len(self.charles_app_pids) > 0 # debug helpers async def get_debug_output(self)->str: return self.debug_str async def set_debug_output(self, debug_str:str): self.debug_str = debug_str async def get_state(self)->str: return self.state async def set_state(self, state:str): self.state = state