Spaces:
Sleeping
Sleeping
File size: 4,210 Bytes
ac35a95 2d6aefc ac35a95 72e4889 ac35a95 aec6f97 2bb91de aec6f97 c490c32 2d6aefc d91a673 72e4889 ac35a95 28b5e08 aec6f97 ac35a95 aec6f97 ac35a95 aec6f97 ac35a95 aec6f97 ac35a95 28b5e08 ac35a95 aec6f97 ac35a95 aec6f97 28b5e08 ac35a95 28b5e08 ac35a95 aec6f97 ac35a95 aec6f97 28b5e08 c490c32 2d6aefc c490c32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np
import pid_helper
@ray.remote
class AppInterfaceActor:
def __init__(self):
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
self.audio_output_queue = Queue(maxsize=50) # Adjust the size as needed
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
self.debug_str = ""
self.state = "Initializing"
self.charles_app_pids = []
@staticmethod
def get_singleton():
return AppInterfaceActor.options(
name="AppInterfaceActor",
get_if_exists=True,
).remote()
# functions for UI to enqueue input, dequeue output
async def enqueue_video_input_frame(self, shared_tensor_ref):
if self.video_input_queue.full():
evicted_item = await self.video_input_queue.get_async()
del evicted_item
await self.video_input_queue.put_async(shared_tensor_ref)
async def enqueue_audio_input_frame(self, shared_buffer_ref):
if self.audio_input_queue.full():
evicted_item = await self.audio_input_queue.get_async()
del evicted_item
await self.audio_input_queue.put_async(shared_buffer_ref)
async def dequeue_audio_output_frame_async(self):
if self.audio_output_queue.empty():
return None
frame = await self.audio_output_queue.get_async()
return frame
async def dequeue_video_output_frames_async(self):
video_frames = []
if self.video_output_queue.empty():
return video_frames
while not self.video_output_queue.empty():
shared_tensor = await self.video_output_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# functions for application to dequeue input, enqueue output
def get_audio_output_queue(self)->Queue:
return self.audio_output_queue
async def enqueue_video_output_frame(self, shared_tensor_ref):
if self.video_output_queue.full():
evicted_item = await self.video_output_queue.get_async()
del evicted_item
await self.video_output_queue.put_async(shared_tensor_ref)
async def dequeue_audio_input_frames_async(self):
audio_frames = []
if self.audio_input_queue.empty():
return audio_frames
while not self.audio_input_queue.empty():
shared_tensor = await self.audio_input_queue.get_async()
audio_frames.append(shared_tensor)
return audio_frames
async def dequeue_video_input_frames_async(self):
video_frames = []
if self.video_input_queue.empty():
return video_frames
while not self.video_input_queue.empty():
shared_tensor = await self.video_input_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# pid helpers
async def add_charles_app_pid(self, pid:int):
self.charles_app_pids.append(pid)
async def get_charles_app_pids(self)->[int]:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return self.charles_app_pids
async def is_charles_app_running(self)->bool:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return len(self.charles_app_pids) > 0
# debug helpers
async def get_debug_output(self)->str:
return self.debug_str
async def set_debug_output(self, debug_str:str):
self.debug_str = debug_str
async def get_state(self)->str:
return self.state
async def set_state(self, state:str):
self.state = state |