Spaces:
Runtime error
Runtime error
File size: 5,288 Bytes
a642a9f ac35a95 a642a9f ac35a95 2d6aefc ac35a95 a642a9f ac35a95 72e4889 ac35a95 aec6f97 a642a9f aec6f97 c490c32 2d6aefc d91a673 72e4889 ac35a95 28b5e08 aec6f97 ac35a95 aec6f97 ac35a95 aec6f97 ac35a95 aec6f97 ac35a95 28b5e08 a642a9f 28b5e08 a642a9f 28b5e08 a642a9f 28b5e08 ac35a95 aec6f97 ac35a95 aec6f97 28b5e08 ac35a95 28b5e08 ac35a95 aec6f97 ac35a95 aec6f97 28b5e08 c490c32 2d6aefc c490c32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import time
import ray
from ray.util.queue import Queue, Empty
from ray.actor import ActorHandle
import numpy as np
import pid_helper
# Ray Queue's take ~.5 seconds to splin up;
# this class creates a pool of queues to cycle through
class QueueFactory:
def __init__(self, max_size:int):
self.queues:[Queue] = []
self.queue_size = 5
self.max_size = max_size
while len(self.queues) < self.queue_size:
self.queues.append(Queue(maxsize=max_size))
def get_queue(self)->Queue:
queue = self.queues.pop(0)
self.queues.append(Queue(maxsize=self.max_size))
return queue
@ray.remote
class AppInterfaceActor:
def __init__(self):
self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
self.audio_output_queue_factory = QueueFactory(max_size=50)
self.audio_output_queue = self.audio_output_queue_factory.get_queue()
self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
self.debug_str = ""
self.state = "Initializing"
self.charles_app_pids = []
@staticmethod
def get_singleton():
return AppInterfaceActor.options(
name="AppInterfaceActor",
get_if_exists=True,
).remote()
# functions for UI to enqueue input, dequeue output
async def enqueue_video_input_frame(self, shared_tensor_ref):
if self.video_input_queue.full():
evicted_item = await self.video_input_queue.get_async()
del evicted_item
await self.video_input_queue.put_async(shared_tensor_ref)
async def enqueue_audio_input_frame(self, shared_buffer_ref):
if self.audio_input_queue.full():
evicted_item = await self.audio_input_queue.get_async()
del evicted_item
await self.audio_input_queue.put_async(shared_buffer_ref)
async def dequeue_audio_output_frame_async(self):
start_time = time.time()
try:
frame = await self.audio_output_queue.get_async(block=False)
except Empty:
frame = None
elapsed_time = time.time() - start_time
if elapsed_time > 0.1:
print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}")
return frame
async def dequeue_video_output_frames_async(self):
video_frames = []
if self.video_output_queue.empty():
return video_frames
while not self.video_output_queue.empty():
shared_tensor = await self.video_output_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# functions for application to dequeue input, enqueue output
def get_audio_output_queue(self)->Queue:
return self.audio_output_queue
async def cycle_output_queue(self)->Queue:
self.audio_output_queue.shutdown(grace_period_s=0.1)
self.audio_output_queue = self.audio_output_queue_factory.get_queue()
return self.audio_output_queue
async def enqueue_video_output_frame(self, shared_tensor_ref):
if self.video_output_queue.full():
evicted_item = await self.video_output_queue.get_async()
del evicted_item
await self.video_output_queue.put_async(shared_tensor_ref)
async def dequeue_audio_input_frames_async(self):
audio_frames = []
if self.audio_input_queue.empty():
return audio_frames
while not self.audio_input_queue.empty():
shared_tensor = await self.audio_input_queue.get_async()
audio_frames.append(shared_tensor)
return audio_frames
async def dequeue_video_input_frames_async(self):
video_frames = []
if self.video_input_queue.empty():
return video_frames
while not self.video_input_queue.empty():
shared_tensor = await self.video_input_queue.get_async()
video_frames.append(shared_tensor)
return video_frames
# pid helpers
async def add_charles_app_pid(self, pid:int):
self.charles_app_pids.append(pid)
async def get_charles_app_pids(self)->[int]:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return self.charles_app_pids
async def is_charles_app_running(self)->bool:
# prune dead pids
running_charles_app_pids = []
for pid in self.charles_app_pids:
if pid_helper.is_pid_running(pid):
running_charles_app_pids.append(pid)
self.charles_app_pids = running_charles_app_pids
return len(self.charles_app_pids) > 0
# debug helpers
async def get_debug_output(self)->str:
return self.debug_str
async def set_debug_output(self, debug_str:str):
self.debug_str = debug_str
async def get_state(self)->str:
return self.state
async def set_state(self, state:str):
self.state = state |