sohojoe commited on
Commit
df0ea75
1 Parent(s): ae52b65

fix async issues and flush ffmpeg actor

Browse files
charles_actor.py CHANGED
@@ -62,7 +62,7 @@ class CharlesActor:
62
  while True:
63
  if len(self._debug_queue) > 0:
64
  prompt = self._debug_queue.pop(0)
65
- self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
66
  audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
67
  if len(audio_frames) > 0:
68
  total_audio_frames += len(audio_frames)
@@ -75,7 +75,7 @@ class CharlesActor:
75
  if len(process_speech_to_text_future) > 0:
76
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
77
  if ready:
78
- prompt, speaker_finished = ray.get(process_speech_to_text_future[0])
79
  del process_speech_to_text_future[0]
80
 
81
  if speaker_finished and len(prompt) > 0:
@@ -86,7 +86,7 @@ class CharlesActor:
86
  table_content = "| System 1 Audio History |\n| --- |\n"
87
  table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
88
  self._system_one_audio_history_output = table_content
89
- self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
90
 
91
  # video_frames = await self._streamlit_av_queue.get_video_frames_async()
92
  # if len(video_frames) > 0:
 
62
  while True:
63
  if len(self._debug_queue) > 0:
64
  prompt = self._debug_queue.pop(0)
65
+ await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
66
  audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
67
  if len(audio_frames) > 0:
68
  total_audio_frames += len(audio_frames)
 
75
  if len(process_speech_to_text_future) > 0:
76
  ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
77
  if ready:
78
+ prompt, speaker_finished = await process_speech_to_text_future[0]
79
  del process_speech_to_text_future[0]
80
 
81
  if speaker_finished and len(prompt) > 0:
 
86
  table_content = "| System 1 Audio History |\n| --- |\n"
87
  table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
88
  self._system_one_audio_history_output = table_content
89
+ await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
90
 
91
  # video_frames = await self._streamlit_av_queue.get_video_frames_async()
92
  # if len(video_frames) > 0:
d_app.py CHANGED
@@ -104,8 +104,12 @@ async def main():
104
  system_one_audio_status.write("Charles is sleeping.")
105
  pass
106
  if charles_actor is not None:
107
- audio_history = ray.get(charles_actor.get_system_one_audio_history_output.remote())
108
- system_one_audio_history_output.markdown(audio_history)
 
 
 
 
109
  await asyncio.sleep(0.1)
110
 
111
  except Exception as e:
 
104
  system_one_audio_status.write("Charles is sleeping.")
105
  pass
106
  if charles_actor is not None:
107
+ try:
108
+ audio_history = await charles_actor.get_system_one_audio_history_output.remote()
109
+ system_one_audio_history_output.markdown(audio_history)
110
+ except Exception as e:
111
+ # assume we disconnected
112
+ charles_actor = None
113
  await asyncio.sleep(0.1)
114
 
115
  except Exception as e:
ffmpeg_converter_actor.py CHANGED
@@ -57,9 +57,9 @@ class FFMpegConverterActor:
57
  # def has_processed_all_data(self):
58
  # return self.process.poll() is not None
59
 
60
- def flush_output_queue(self):
61
  while not self.output_queue.empty():
62
- self.output_queue.get()
63
 
64
  def close(self):
65
  self.input_pipe.close()
 
57
  # def has_processed_all_data(self):
58
  # return self.process.poll() is not None
59
 
60
+ async def flush_output_queue(self):
61
  while not self.output_queue.empty():
62
+ await self.output_queue.get_async()
63
 
64
  def close(self):
65
  self.input_pipe.close()
respond_to_prompt_actor.py CHANGED
@@ -26,13 +26,13 @@ class PromptToLLMActor:
26
  print(f"{sentence}")
27
  await self.output_queue.put_async(sentence)
28
 
29
- def cancel(self):
30
  if self.cancel_event:
31
  self.cancel_event.set()
32
  while not self.input_queue.empty():
33
- self.input_queue.get()
34
  while not self.output_queue.empty():
35
- self.output_queue.get()
36
 
37
  @ray.remote
38
  class LLMSentanceToSpeechActor:
@@ -51,13 +51,13 @@ class LLMSentanceToSpeechActor:
51
  async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
52
  await self.output_queue.put_async(chunk)
53
 
54
- def cancel(self):
55
  if self.cancel_event:
56
  self.cancel_event.set()
57
  while not self.input_queue.empty():
58
- self.input_queue.get()
59
  while not self.output_queue.empty():
60
- self.output_queue.get()
61
 
62
 
63
  @ray.remote
@@ -74,9 +74,9 @@ class SpeechToSpeakerActor:
74
  # print (f"Got audio chunk {len(audio_chunk)}")
75
  self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
76
 
77
- def cancel(self):
78
  while not self.input_queue.empty():
79
- self.input_queue.get()
80
 
81
  @ray.remote
82
  class SpeechToConverterActor:
@@ -115,18 +115,18 @@ class RespondToPromptActor:
115
  self.llm_sentence_to_speech.run.remote()
116
  self.speech_output.run.remote()
117
 
118
- def enqueue_prompt(self, prompt):
119
  print("flush anything queued")
120
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
121
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
122
  speech_to_speaker_future = self.speech_output.cancel.remote()
123
  ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
124
- ray.get([
125
  prompt_to_llm_future,
126
  llm_sentence_to_speech_future,
127
  speech_to_speaker_future,
128
  ffmpeg_converter_future,
129
- ])
130
  self.prompt_queue.put(prompt)
131
  print("Enqueued prompt")
132
 
 
26
  print(f"{sentence}")
27
  await self.output_queue.put_async(sentence)
28
 
29
+ async def cancel(self):
30
  if self.cancel_event:
31
  self.cancel_event.set()
32
  while not self.input_queue.empty():
33
+ await self.input_queue.get_async()
34
  while not self.output_queue.empty():
35
+ await self.output_queue.get_async()
36
 
37
  @ray.remote
38
  class LLMSentanceToSpeechActor:
 
51
  async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
52
  await self.output_queue.put_async(chunk)
53
 
54
+ async def cancel(self):
55
  if self.cancel_event:
56
  self.cancel_event.set()
57
  while not self.input_queue.empty():
58
+ await self.input_queue.get_async()
59
  while not self.output_queue.empty():
60
+ await self.output_queue.get_async()
61
 
62
 
63
  @ray.remote
 
74
  # print (f"Got audio chunk {len(audio_chunk)}")
75
  self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
76
 
77
+ async def cancel(self):
78
  while not self.input_queue.empty():
79
+ await self.input_queue.get_async()
80
 
81
  @ray.remote
82
  class SpeechToConverterActor:
 
115
  self.llm_sentence_to_speech.run.remote()
116
  self.speech_output.run.remote()
117
 
118
+ async def enqueue_prompt(self, prompt):
119
  print("flush anything queued")
120
  prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
121
  llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
122
  speech_to_speaker_future = self.speech_output.cancel.remote()
123
  ffmpeg_converter_future = self.ffmepg_converter_actor.flush_output_queue.remote()
124
+ await asyncio.gather(
125
  prompt_to_llm_future,
126
  llm_sentence_to_speech_future,
127
  speech_to_speaker_future,
128
  ffmpeg_converter_future,
129
+ )
130
  self.prompt_queue.put(prompt)
131
  print("Enqueued prompt")
132