File size: 3,347 Bytes
ac35a95
 
 
 
 
 
 
 
72e4889
ac35a95
aec6f97
 
 
 
c490c32
 
d91a673
72e4889
 
 
 
 
 
ac35a95
28b5e08
aec6f97
 
 
ac35a95
aec6f97
ac35a95
aec6f97
 
 
ac35a95
aec6f97
ac35a95
28b5e08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
ac35a95
 
28b5e08
ac35a95
aec6f97
ac35a95
aec6f97
28b5e08
 
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np


@ray.remote
class AppInterfaceActor:
    def __init__(self):
        self.audio_input_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_input_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.audio_output_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_output_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.debug_str = ""
        self.state = "Initializing"

    @staticmethod
    def get_singleton():
        return AppInterfaceActor.options(
            name="AppInterfaceActor",
            get_if_exists=True,
        ).remote()

# functions for UI to enqueue input, dequeue output
    async def enqueue_video_input_frame(self, shared_tensor_ref):
        if self.video_input_queue.full():
            evicted_item = await self.video_input_queue.get_async()
            del evicted_item
        await self.video_input_queue.put_async(shared_tensor_ref)

    async def enqueue_audio_input_frame(self, shared_buffer_ref):
        if self.audio_input_queue.full():
            evicted_item = await self.audio_input_queue.get_async()
            del evicted_item
        await self.audio_input_queue.put_async(shared_buffer_ref)

    async def dequeue_audio_output_frame_async(self):
        if self.audio_output_queue.empty():
            return None
        frame = await self.audio_output_queue.get_async()
        return frame

    async def dequeue_video_output_frames_async(self):
        video_frames = []
        if self.video_output_queue.empty():
            return video_frames
        while not self.video_output_queue.empty():
            shared_tensor = await self.video_output_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# functions for application to dequeue input, enqueue output    
    def get_audio_output_queue(self)->Queue:
        return self.audio_output_queue
    
    async def enqueue_video_output_frame(self, shared_tensor_ref):
        if self.video_output_queue.full():
            evicted_item = await self.video_output_queue.get_async()
            del evicted_item
        await self.video_output_queue.put_async(shared_tensor_ref)
    
    async def dequeue_audio_input_frames_async(self):
        audio_frames = []
        if self.audio_input_queue.empty():
            return audio_frames
        while not self.audio_input_queue.empty():
            shared_tensor = await self.audio_input_queue.get_async()
            audio_frames.append(shared_tensor)
        return audio_frames

    async def dequeue_video_input_frames_async(self):
        video_frames = []
        if self.video_input_queue.empty():
            return video_frames
        while not self.video_input_queue.empty():
            shared_tensor = await self.video_input_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames
    
# debug helpers
    async def get_debug_output(self)->str:
        return self.debug_str
    
    async def set_debug_output(self, debug_str:str):
        self.debug_str = debug_str

    async def get_state(self)->str:
        return self.state
    
    async def set_state(self, state:str):
        self.state = state