File size: 5,288 Bytes
a642a9f
ac35a95
a642a9f
ac35a95
 
2d6aefc
ac35a95
a642a9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac35a95
72e4889
ac35a95
aec6f97
 
a642a9f
 
aec6f97
c490c32
 
2d6aefc
d91a673
72e4889
 
 
 
 
 
ac35a95
28b5e08
aec6f97
 
 
ac35a95
aec6f97
ac35a95
aec6f97
 
 
ac35a95
aec6f97
ac35a95
28b5e08
a642a9f
 
 
 
 
 
 
 
28b5e08
 
 
 
 
 
 
 
 
 
 
a642a9f
28b5e08
 
a642a9f
 
 
 
 
28b5e08
 
 
 
 
 
 
 
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
ac35a95
 
28b5e08
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
c490c32
2d6aefc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c490c32
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import time
import ray
from ray.util.queue import Queue, Empty
from ray.actor import ActorHandle
import numpy as np
import pid_helper

# Ray Queue's take ~.5 seconds to splin up; 
# this class creates a pool of queues to cycle through
class QueueFactory:
    def __init__(self, max_size:int):
        self.queues:[Queue] = []
        self.queue_size = 5
        self.max_size = max_size
        while len(self.queues) < self.queue_size:
            self.queues.append(Queue(maxsize=max_size))

    def get_queue(self)->Queue:
        queue = self.queues.pop(0)
        self.queues.append(Queue(maxsize=self.max_size))
        return queue
        
@ray.remote
class AppInterfaceActor:
    def __init__(self):
        self.audio_input_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_input_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.audio_output_queue_factory = QueueFactory(max_size=50)
        self.audio_output_queue = self.audio_output_queue_factory.get_queue()
        self.video_output_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.debug_str = ""
        self.state = "Initializing"
        self.charles_app_pids = []

    @staticmethod
    def get_singleton():
        return AppInterfaceActor.options(
            name="AppInterfaceActor",
            get_if_exists=True,
        ).remote()

# functions for UI to enqueue input, dequeue output
    async def enqueue_video_input_frame(self, shared_tensor_ref):
        if self.video_input_queue.full():
            evicted_item = await self.video_input_queue.get_async()
            del evicted_item
        await self.video_input_queue.put_async(shared_tensor_ref)

    async def enqueue_audio_input_frame(self, shared_buffer_ref):
        if self.audio_input_queue.full():
            evicted_item = await self.audio_input_queue.get_async()
            del evicted_item
        await self.audio_input_queue.put_async(shared_buffer_ref)

    async def dequeue_audio_output_frame_async(self):
        start_time = time.time()
        try:
            frame = await self.audio_output_queue.get_async(block=False)
        except Empty:
            frame = None
        elapsed_time = time.time() - start_time
        if elapsed_time > 0.1:
            print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}")
        return frame

    async def dequeue_video_output_frames_async(self):
        video_frames = []
        if self.video_output_queue.empty():
            return video_frames
        while not self.video_output_queue.empty():
            shared_tensor = await self.video_output_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# functions for application to dequeue input, enqueue output
    def get_audio_output_queue(self)->Queue:
        return self.audio_output_queue

    async def cycle_output_queue(self)->Queue:
        self.audio_output_queue.shutdown(grace_period_s=0.1)
        self.audio_output_queue = self.audio_output_queue_factory.get_queue()
        return self.audio_output_queue
    
    async def enqueue_video_output_frame(self, shared_tensor_ref):
        if self.video_output_queue.full():
            evicted_item = await self.video_output_queue.get_async()
            del evicted_item
        await self.video_output_queue.put_async(shared_tensor_ref)
    
    async def dequeue_audio_input_frames_async(self):
        audio_frames = []
        if self.audio_input_queue.empty():
            return audio_frames
        while not self.audio_input_queue.empty():
            shared_tensor = await self.audio_input_queue.get_async()
            audio_frames.append(shared_tensor)
        return audio_frames

    async def dequeue_video_input_frames_async(self):
        video_frames = []
        if self.video_input_queue.empty():
            return video_frames
        while not self.video_input_queue.empty():
            shared_tensor = await self.video_input_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# pid helpers
    async def add_charles_app_pid(self, pid:int):
        self.charles_app_pids.append(pid)

    async def get_charles_app_pids(self)->[int]:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return self.charles_app_pids

    async def is_charles_app_running(self)->bool:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return len(self.charles_app_pids) > 0

# debug helpers
    async def get_debug_output(self)->str:
        return self.debug_str
    
    async def set_debug_output(self, debug_str:str):
        self.debug_str = debug_str

    async def get_state(self)->str:
        return self.state
    
    async def set_state(self, state:str):
        self.state = state