File size: 1,372 Bytes
62a21bd
 
 
 
 
 
 
 
 
 
 
 
 
 
ad67495
 
 
62a21bd
 
 
ad67495
 
 
62a21bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import ray
from ray.util.queue import Queue
from ray.actor import ActorHandle
import torch
import numpy as np


@ray.remote
class InputAVQueueActor:
    def __init__(self):
        self.audio_queue = Queue(maxsize=100)  # Adjust the size as needed
        self.video_queue = Queue(maxsize=100)  # Adjust the size as needed

    def enqueue_video_frame(self, shared_tensor_ref):
        if self.video_queue.full():
            evicted_item = self.video_queue.get()
            del evicted_item
        self.video_queue.put(shared_tensor_ref)

    def enqueue_audio_frame(self, shared_buffer_ref):
        if self.audio_queue.full():
            evicted_item = self.audio_queue.get()
            del evicted_item
        self.audio_queue.put(shared_buffer_ref)


    def get_audio_frames(self):
        audio_frames = []
        if self.audio_queue.empty():
            return audio_frames
        while not self.audio_queue.empty():
            shared_tensor_ref = self.audio_queue.get()
            audio_frames.append(shared_tensor_ref)
        return audio_frames

    def get_video_frames(self):
        video_frames = []
        if self.video_queue.empty():
            return video_frames
        while not self.video_queue.empty():
            shared_tensor_ref = self.video_queue.get()
            video_frames.append(shared_tensor_ref)
        return video_frames