File size: 4,210 Bytes
ac35a95
 
 
 
 
2d6aefc
ac35a95
 
72e4889
ac35a95
aec6f97
 
2bb91de
aec6f97
c490c32
 
2d6aefc
d91a673
72e4889
 
 
 
 
 
ac35a95
28b5e08
aec6f97
 
 
ac35a95
aec6f97
ac35a95
aec6f97
 
 
ac35a95
aec6f97
ac35a95
28b5e08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
ac35a95
 
28b5e08
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
c490c32
2d6aefc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c490c32
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np
import pid_helper

@ray.remote
class AppInterfaceActor:
    def __init__(self):
        self.audio_input_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_input_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.audio_output_queue = Queue(maxsize=50)  # Adjust the size as needed
        self.video_output_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.debug_str = ""
        self.state = "Initializing"
        self.charles_app_pids = []

    @staticmethod
    def get_singleton():
        return AppInterfaceActor.options(
            name="AppInterfaceActor",
            get_if_exists=True,
        ).remote()

# functions for UI to enqueue input, dequeue output
    async def enqueue_video_input_frame(self, shared_tensor_ref):
        if self.video_input_queue.full():
            evicted_item = await self.video_input_queue.get_async()
            del evicted_item
        await self.video_input_queue.put_async(shared_tensor_ref)

    async def enqueue_audio_input_frame(self, shared_buffer_ref):
        if self.audio_input_queue.full():
            evicted_item = await self.audio_input_queue.get_async()
            del evicted_item
        await self.audio_input_queue.put_async(shared_buffer_ref)

    async def dequeue_audio_output_frame_async(self):
        if self.audio_output_queue.empty():
            return None
        frame = await self.audio_output_queue.get_async()
        return frame

    async def dequeue_video_output_frames_async(self):
        video_frames = []
        if self.video_output_queue.empty():
            return video_frames
        while not self.video_output_queue.empty():
            shared_tensor = await self.video_output_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# functions for application to dequeue input, enqueue output    
    def get_audio_output_queue(self)->Queue:
        return self.audio_output_queue
    
    async def enqueue_video_output_frame(self, shared_tensor_ref):
        if self.video_output_queue.full():
            evicted_item = await self.video_output_queue.get_async()
            del evicted_item
        await self.video_output_queue.put_async(shared_tensor_ref)
    
    async def dequeue_audio_input_frames_async(self):
        audio_frames = []
        if self.audio_input_queue.empty():
            return audio_frames
        while not self.audio_input_queue.empty():
            shared_tensor = await self.audio_input_queue.get_async()
            audio_frames.append(shared_tensor)
        return audio_frames

    async def dequeue_video_input_frames_async(self):
        video_frames = []
        if self.video_input_queue.empty():
            return video_frames
        while not self.video_input_queue.empty():
            shared_tensor = await self.video_input_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# pid helpers
    async def add_charles_app_pid(self, pid:int):
        self.charles_app_pids.append(pid)

    async def get_charles_app_pids(self)->[int]:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return self.charles_app_pids

    async def is_charles_app_running(self)->bool:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return len(self.charles_app_pids) > 0

# debug helpers
    async def get_debug_output(self)->str:
        return self.debug_str
    
    async def set_debug_output(self, debug_str:str):
        self.debug_str = debug_str

    async def get_state(self)->str:
        return self.state
    
    async def set_state(self, state:str):
        self.state = state