project_charles / app_interface_actor.py
sohojoe's picture
fix: ui_app will start charles_app when not in debug mode
2d6aefc
raw
history blame
4.21 kB
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np
import pid_helper
@ray.remote
class AppInterfaceActor:
def __init__(self):
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
self.audio_output_queue = Queue(maxsize=50) # Adjust the size as needed
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
self.debug_str = ""
self.state = "Initializing"
self.charles_app_pids = []
@staticmethod
def get_singleton():
return AppInterfaceActor.options(
name="AppInterfaceActor",
get_if_exists=True,
).remote()
# functions for UI to enqueue input, dequeue output
async def enqueue_video_input_frame(self, shared_tensor_ref):
if self.video_input_queue.full():
evicted_item = await self.video_input_queue.get_async()
del evicted_item
await self.video_input_queue.put_async(shared_tensor_ref)
async def enqueue_audio_input_frame(self, shared_buffer_ref):
if self.audio_input_queue.full():
evicted_item = await self.audio_input_queue.get_async()
del evicted_item
await self.audio_input_queue.put_async(shared_buffer_ref)
async def dequeue_audio_output_frame_async(self):
if self.audio_output_queue.empty():
return None
frame = await self.audio_output_queue.get_async()
return frame
async def dequeue_video_output_frames_async(self):
video_frames = []
if self.video_output_queue.empty():
return video_frames
while not self.video_output_queue.empty():
shared_tensor = await self.video_output_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# functions for application to dequeue input, enqueue output
def get_audio_output_queue(self)->Queue:
return self.audio_output_queue
async def enqueue_video_output_frame(self, shared_tensor_ref):
if self.video_output_queue.full():
evicted_item = await self.video_output_queue.get_async()
del evicted_item
await self.video_output_queue.put_async(shared_tensor_ref)
async def dequeue_audio_input_frames_async(self):
audio_frames = []
if self.audio_input_queue.empty():
return audio_frames
while not self.audio_input_queue.empty():
shared_tensor = await self.audio_input_queue.get_async()
audio_frames.append(shared_tensor)
return audio_frames
async def dequeue_video_input_frames_async(self):
video_frames = []
if self.video_input_queue.empty():
return video_frames
while not self.video_input_queue.empty():
shared_tensor = await self.video_input_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# pid helpers
async def add_charles_app_pid(self, pid:int):
self.charles_app_pids.append(pid)
async def get_charles_app_pids(self)->[int]:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return self.charles_app_pids
async def is_charles_app_running(self)->bool:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return len(self.charles_app_pids) > 0
# debug helpers
async def get_debug_output(self)->str:
return self.debug_str
async def set_debug_output(self, debug_str:str):
self.debug_str = debug_str
async def get_state(self)->str:
return self.state
async def set_state(self, state:str):
self.state = state