sohojoe commited on
Commit
28b5e08
1 Parent(s): 72e4889

remove charles dependancy on streamlit_av_queue

Browse files
Files changed (3) hide show
  1. app_interface_actor.py +33 -27
  2. charles_actor.py +8 -12
  3. streamlit_av_queue.py +4 -18
app_interface_actor.py CHANGED
@@ -20,6 +20,7 @@ class AppInterfaceActor:
20
  get_if_exists=True,
21
  ).remote()
22
 
 
23
  async def enqueue_video_input_frame(self, shared_tensor_ref):
24
  if self.video_input_queue.full():
25
  evicted_item = await self.video_input_queue.get_async()
@@ -32,40 +33,45 @@ class AppInterfaceActor:
32
  del evicted_item
33
  await self.audio_input_queue.put_async(shared_buffer_ref)
34
 
35
- async def get_audio_input_frames(self):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  audio_frames = []
37
  if self.audio_input_queue.empty():
38
  return audio_frames
39
  while not self.audio_input_queue.empty():
40
- shared_tensor_ref = await self.audio_input_queue.get_async()
41
- audio_frames.append(shared_tensor_ref)
42
  return audio_frames
43
 
44
- async def get_video_input_frames(self):
45
  video_frames = []
46
  if self.video_input_queue.empty():
47
  return video_frames
48
  while not self.video_input_queue.empty():
49
- shared_tensor_ref = await self.video_input_queue.get_async()
50
- video_frames.append(shared_tensor_ref)
51
- return video_frames
52
-
53
- def get_audio_output_queue(self)->Queue:
54
- return self.audio_output_queue
55
-
56
- def get_video_output_queue(self)->Queue:
57
- return self.video_output_queue
58
-
59
- async def get_audio_output_frame(self):
60
- if self.audio_output_queue.empty():
61
- return None
62
- frame = await self.audio_output_queue.get_async()
63
- return frame
64
-
65
- async def get_video_output_frame(self):
66
- if self.video_output_queue.empty():
67
- return None
68
- frame = None
69
- while not self.video_output_queue.empty():
70
- frame = await self.video_output_queue.get_async()
71
- return frame
 
20
  get_if_exists=True,
21
  ).remote()
22
 
23
+ # functions for UI to enqueue input, dequeue output
24
  async def enqueue_video_input_frame(self, shared_tensor_ref):
25
  if self.video_input_queue.full():
26
  evicted_item = await self.video_input_queue.get_async()
 
33
  del evicted_item
34
  await self.audio_input_queue.put_async(shared_buffer_ref)
35
 
36
+ async def dequeue_audio_output_frame_async(self):
37
+ if self.audio_output_queue.empty():
38
+ return None
39
+ frame = await self.audio_output_queue.get_async()
40
+ return frame
41
+
42
+ async def dequeue_video_output_frames_async(self):
43
+ video_frames = []
44
+ if self.video_output_queue.empty():
45
+ return video_frames
46
+ while not self.video_output_queue.empty():
47
+ shared_tensor = await self.video_output_queue.get_async()
48
+ video_frames.append(shared_tensor)
49
+ return video_frames
50
+
51
+ # functions for application to dequeue input, enqueue output
52
+ def get_audio_output_queue(self)->Queue:
53
+ return self.audio_output_queue
54
+
55
+ async def enqueue_video_output_frame(self, shared_tensor_ref):
56
+ if self.video_output_queue.full():
57
+ evicted_item = await self.video_output_queue.get_async()
58
+ del evicted_item
59
+ await self.video_output_queue.put_async(shared_tensor_ref)
60
+
61
+ async def dequeue_audio_input_frames_async(self):
62
  audio_frames = []
63
  if self.audio_input_queue.empty():
64
  return audio_frames
65
  while not self.audio_input_queue.empty():
66
+ shared_tensor = await self.audio_input_queue.get_async()
67
+ audio_frames.append(shared_tensor)
68
  return audio_frames
69
 
70
+ async def dequeue_video_input_frames_async(self):
71
  video_frames = []
72
  if self.video_input_queue.empty():
73
  return video_frames
74
  while not self.video_input_queue.empty():
75
+ shared_tensor = await self.video_input_queue.get_async()
76
+ video_frames.append(shared_tensor)
77
+ return video_frames
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
charles_actor.py CHANGED
@@ -29,12 +29,11 @@ class CharlesActor:
29
 
30
  async def _initalize_resources(self):
31
  # Initialize resources
32
- print("000 - create StreamlitAVQueue")
33
- self._state = "000 - creating StreamlitAVQueue"
34
- from streamlit_av_queue import StreamlitAVQueue
35
- self._streamlit_av_queue = StreamlitAVQueue()
36
- self._audio_output_queue = await self._streamlit_av_queue.get_audio_output_queue()
37
- self._video_output_queue = await self._streamlit_av_queue.get_video_output_queue()
38
 
39
  print("001 - create RespondToPromptActor")
40
  self._state = "001 - creating RespondToPromptActor"
@@ -114,8 +113,8 @@ class CharlesActor:
114
  env_state = await self._environment_state_actor.begin_next_step.remote()
115
  self._environment_state = env_state
116
  self._agent_state_actor.begin_step.remote()
117
- audio_frames = await self._streamlit_av_queue.get_audio_input_frames_async()
118
- video_frames = await self._streamlit_av_queue.get_video_frames_async()
119
 
120
  if len(audio_frames) > 0:
121
  total_audio_frames += len(audio_frames)
@@ -215,11 +214,8 @@ class CharlesActor:
215
  is_talking = bool(count > 0)
216
  has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
217
  frame = self._animator.update(is_talking)
218
- if self._video_output_queue.full():
219
- evicted_item = await self._video_output_queue.get_async()
220
- del evicted_item
221
  frame_ref = ray.put(frame)
222
- await self._video_output_queue.put_async(frame_ref)
223
 
224
  loops+=1
225
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
 
29
 
30
  async def _initalize_resources(self):
31
  # Initialize resources
32
+ print("000 - create AppInterfaceActor")
33
+ self._state = "000 - creating AppInterfaceActor"
34
+ from app_interface_actor import AppInterfaceActor
35
+ self._app_interface_actor = AppInterfaceActor.get_singleton()
36
+ self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
 
37
 
38
  print("001 - create RespondToPromptActor")
39
  self._state = "001 - creating RespondToPromptActor"
 
113
  env_state = await self._environment_state_actor.begin_next_step.remote()
114
  self._environment_state = env_state
115
  self._agent_state_actor.begin_step.remote()
116
+ audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()
117
+ video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote()
118
 
119
  if len(audio_frames) > 0:
120
  total_audio_frames += len(audio_frames)
 
214
  is_talking = bool(count > 0)
215
  has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
216
  frame = self._animator.update(is_talking)
 
 
 
217
  frame_ref = ray.put(frame)
218
+ await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref)
219
 
220
  loops+=1
221
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. Is speaking: {is_talking}({count}). {vector_debug}"
streamlit_av_queue.py CHANGED
@@ -35,9 +35,9 @@ class StreamlitAVQueue:
35
  try:
36
  with self._lock:
37
  should_look = self._looking
38
- next_video_output_frame = await self.app_interface_actor.get_video_output_frame.remote()
39
- if next_video_output_frame is not None:
40
- self._video_output_frame = next_video_output_frame
41
  for i, frame in enumerate(frames):
42
  user_image = frame.to_ndarray(format="rgb24")
43
  if should_look:
@@ -94,7 +94,7 @@ class StreamlitAVQueue:
94
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
95
  assert frame.format.bytes == 2
96
  assert frame.format.name == 's16'
97
- frame_as_bytes = await self.app_interface_actor.get_audio_output_frame.remote()
98
  if frame_as_bytes:
99
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
100
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
@@ -111,17 +111,3 @@ class StreamlitAVQueue:
111
  except Exception as e:
112
  print (e)
113
  return new_frames
114
-
115
- async def get_audio_input_frames_async(self) -> List[av.AudioFrame]:
116
- shared_buffers = await self.app_interface_actor.get_audio_input_frames.remote()
117
- return shared_buffers
118
-
119
- async def get_video_frames_async(self) -> List[av.AudioFrame]:
120
- shared_tensors = await self.app_interface_actor.get_video_input_frames.remote()
121
- return shared_tensors
122
-
123
- def get_audio_output_queue(self)->Queue:
124
- return self.app_interface_actor.get_audio_output_queue.remote()
125
-
126
- def get_video_output_queue(self)->Queue:
127
- return self.app_interface_actor.get_video_output_queue.remote()
 
35
  try:
36
  with self._lock:
37
  should_look = self._looking
38
+ video_output_frames = await self.app_interface_actor.dequeue_video_output_frames_async.remote()
39
+ if len(video_output_frames) > 0:
40
+ self._video_output_frame = video_output_frames[-1]
41
  for i, frame in enumerate(frames):
42
  user_image = frame.to_ndarray(format="rgb24")
43
  if should_look:
 
94
  # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
95
  assert frame.format.bytes == 2
96
  assert frame.format.name == 's16'
97
+ frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote()
98
  if frame_as_bytes:
99
  # print(f"frame_as_bytes: {len(frame_as_bytes)}")
100
  assert len(frame_as_bytes) == frame.samples * frame.format.bytes
 
111
  except Exception as e:
112
  print (e)
113
  return new_frames