import ray from ray.util.queue import Queue from ray.actor import ActorHandle import torch import numpy as np @ray.remote class InputAVQueueActor: def __init__(self): self.audio_queue = Queue(maxsize=100) # Adjust the size as needed self.video_queue = Queue(maxsize=100) # Adjust the size as needed def enqueue_video_frame(self, shared_tensor_ref): if self.video_queue.full(): evicted_item = self.video_queue.get() del evicted_item self.video_queue.put(shared_tensor_ref) def enqueue_audio_frame(self, shared_buffer_ref): if self.audio_queue.full(): evicted_item = self.audio_queue.get() del evicted_item self.audio_queue.put(shared_buffer_ref) def get_audio_frames(self): audio_frames = [] if self.audio_queue.empty(): return audio_frames while not self.audio_queue.empty(): shared_tensor_ref = self.audio_queue.get() audio_frames.append(shared_tensor_ref) return audio_frames def get_video_frames(self): video_frames = [] if self.video_queue.empty(): return video_frames while not self.video_queue.empty(): shared_tensor_ref = self.video_queue.get() video_frames.append(shared_tensor_ref) return video_frames