sohojoe commited on
Commit
72e4889
β€’
1 Parent(s): aec6f97

refactor: rename webrtc... to app_interface...

Browse files
webrtc_av_queue_actor.py β†’ app_interface_actor.py RENAMED
@@ -6,13 +6,19 @@ import numpy as np
6
 
7
 
8
  @ray.remote
9
- class WebRtcAVQueueActor:
10
  def __init__(self):
11
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
  self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
 
 
 
 
 
 
 
16
 
17
  async def enqueue_video_input_frame(self, shared_tensor_ref):
18
  if self.video_input_queue.full():
 
6
 
7
 
8
  @ray.remote
9
+ class AppInterfaceActor:
10
  def __init__(self):
11
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
  self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
 
16
+ @staticmethod
17
+ def get_singleton():
18
+ return AppInterfaceActor.options(
19
+ name="AppInterfaceActor",
20
+ get_if_exists=True,
21
+ ).remote()
22
 
23
  async def enqueue_video_input_frame(self, shared_tensor_ref):
24
  if self.video_input_queue.full():
streamlit_av_queue.py CHANGED
@@ -8,7 +8,7 @@ import cv2
8
  import numpy as np
9
  import ray
10
  from ray.util.queue import Queue
11
- from webrtc_av_queue_actor import WebRtcAVQueueActor
12
  import pydub
13
  import torch
14
 
@@ -19,10 +19,7 @@ class StreamlitAVQueue:
19
  self._listening = True
20
  self._looking = False
21
  self._lock = threading.Lock()
22
- self.queue_actor = WebRtcAVQueueActor.options(
23
- name="WebRtcAVQueueActor",
24
- get_if_exists=True,
25
- ).remote()
26
  self._video_output_frame = None
27
 
28
  def set_looking_listening(self, looking, listening: bool):
@@ -38,14 +35,14 @@ class StreamlitAVQueue:
38
  try:
39
  with self._lock:
40
  should_look = self._looking
41
- next_video_output_frame = await self.queue_actor.get_video_output_frame.remote()
42
  if next_video_output_frame is not None:
43
  self._video_output_frame = next_video_output_frame
44
  for i, frame in enumerate(frames):
45
  user_image = frame.to_ndarray(format="rgb24")
46
  if should_look:
47
  shared_tensor_ref = ray.put(user_image)
48
- await self.queue_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
49
  if self._video_output_frame is not None:
50
  frame = self._video_output_frame
51
  # resize user image to 1/4 size
@@ -85,7 +82,7 @@ class StreamlitAVQueue:
85
  sound_chunk += sound
86
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
87
  shared_buffer_ref = ray.put(shared_buffer)
88
- await self.queue_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
89
  except Exception as e:
90
  print (e)
91
 
@@ -97,7 +94,7 @@ class StreamlitAVQueue:
97
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
98
  assert frame.format.bytes == 2
99
  assert frame.format.name == 's16'
100
- frame_as_bytes = await self.queue_actor.get_audio_output_frame.remote()
101
  if frame_as_bytes:
102
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
103
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
@@ -116,15 +113,15 @@ class StreamlitAVQueue:
116
  return new_frames
117
 
118
  async def get_audio_input_frames_async(self) -> List[av.AudioFrame]:
119
- shared_buffers = await self.queue_actor.get_audio_input_frames.remote()
120
  return shared_buffers
121
 
122
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
123
- shared_tensors = await self.queue_actor.get_video_input_frames.remote()
124
  return shared_tensors
125
 
126
  def get_audio_output_queue(self)->Queue:
127
- return self.queue_actor.get_audio_output_queue.remote()
128
 
129
  def get_video_output_queue(self)->Queue:
130
- return self.queue_actor.get_video_output_queue.remote()
 
8
  import numpy as np
9
  import ray
10
  from ray.util.queue import Queue
11
+ from app_interface_actor import AppInterfaceActor
12
  import pydub
13
  import torch
14
 
 
19
  self._listening = True
20
  self._looking = False
21
  self._lock = threading.Lock()
22
+ self.app_interface_actor = AppInterfaceActor.get_singleton()
 
 
 
23
  self._video_output_frame = None
24
 
25
  def set_looking_listening(self, looking, listening: bool):
 
35
  try:
36
  with self._lock:
37
  should_look = self._looking
38
+ next_video_output_frame = await self.app_interface_actor.get_video_output_frame.remote()
39
  if next_video_output_frame is not None:
40
  self._video_output_frame = next_video_output_frame
41
  for i, frame in enumerate(frames):
42
  user_image = frame.to_ndarray(format="rgb24")
43
  if should_look:
44
  shared_tensor_ref = ray.put(user_image)
45
+ await self.app_interface_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
46
  if self._video_output_frame is not None:
47
  frame = self._video_output_frame
48
  # resize user image to 1/4 size
 
82
  sound_chunk += sound
83
  shared_buffer = np.array(sound_chunk.get_array_of_samples())
84
  shared_buffer_ref = ray.put(shared_buffer)
85
+ await self.app_interface_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
86
  except Exception as e:
87
  print (e)
88
 
 
94
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
95
  assert frame.format.bytes == 2
96
  assert frame.format.name == 's16'
97
+ frame_as_bytes = await self.app_interface_actor.get_audio_output_frame.remote()
98
  if frame_as_bytes:
99
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
100
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
 
113
  return new_frames
114
 
115
  async def get_audio_input_frames_async(self) -> List[av.AudioFrame]:
116
+ shared_buffers = await self.app_interface_actor.get_audio_input_frames.remote()
117
  return shared_buffers
118
 
119
  async def get_video_frames_async(self) -> List[av.AudioFrame]:
120
+ shared_tensors = await self.app_interface_actor.get_video_input_frames.remote()
121
  return shared_tensors
122
 
123
  def get_audio_output_queue(self)->Queue:
124
+ return self.app_interface_actor.get_audio_output_queue.remote()
125
 
126
  def get_video_output_queue(self)->Queue:
127
+ return self.app_interface_actor.get_video_output_queue.remote()