File size: 2,290 Bytes
ac35a95
 
 
 
 
 
 
 
 
 
cf5e7f4
 
 
 
d91a673
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
0d27fd9
ac35a95
 
 
 
0d27fd9
ac35a95
 
 
0d27fd9
ac35a95
 
 
 
0d27fd9
ac35a95
 
d91a673
cf5e7f4
d91a673
 
cf5e7f4
 
 
d91a673
 
 
cf5e7f4
 
 
 
 
 
795c382
cf5e7f4
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np


@ray.remote
class WebRtcAVQueueActor:
    def __init__(self):
        self.in_audio_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.in_video_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.out_audio_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.out_video_queue = Queue(maxsize=10)  # Adjust the size as needed


    async def enqueue_in_video_frame(self, shared_tensor_ref):
        if self.in_video_queue.full():
            evicted_item = await self.in_video_queue.get_async()
            del evicted_item
        await self.in_video_queue.put_async(shared_tensor_ref)

    async def enqueue_in_audio_frame(self, shared_buffer_ref):
        if self.in_audio_queue.full():
            evicted_item = await self.in_audio_queue.get_async()
            del evicted_item
        await self.in_audio_queue.put_async(shared_buffer_ref)

    async def get_in_audio_frames(self):
        audio_frames = []
        if self.in_audio_queue.empty():
            return audio_frames
        while not self.in_audio_queue.empty():
            shared_tensor_ref = await self.in_audio_queue.get_async()
            audio_frames.append(shared_tensor_ref)
        return audio_frames

    async def get_in_video_frames(self):
        video_frames = []
        if self.in_video_queue.empty():
            return video_frames
        while not self.in_video_queue.empty():
            shared_tensor_ref = await self.in_video_queue.get_async()
            video_frames.append(shared_tensor_ref)
        return video_frames
    
    def get_out_audio_queue(self)->Queue:
        return self.out_audio_queue
    
    def get_out_video_queue(self)->Queue:
        return self.out_video_queue
    
    async def get_out_audio_frame(self):
        if self.out_audio_queue.empty():
            return None
        frame = await self.out_audio_queue.get_async()
        return frame

    async def get_out_video_frame(self):
        if self.out_video_queue.empty():
            return None
        frame = None
        while not self.out_video_queue.empty():
            frame = await self.out_video_queue.get_async()
        return frame