sohojoe commited on
Commit
afc1e50
1 Parent(s): bcea2ea

add cancel and fix a bunch of async problems

Browse files
charles_actor.py CHANGED
@@ -32,7 +32,7 @@ class CharlesActor:
32
  self._respond_to_prompt_actor = RespondToPromptActor.remote()
33
 
34
  self._debug_queue = [
35
- # "hello, how are you today?",
36
  # "hmm, interesting, tell me more about that.",
37
  ]
38
  print("010")
@@ -55,7 +55,7 @@ class CharlesActor:
55
  while True:
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
- await self._respond_to_prompt_actor.enqueue(prompt)
59
  audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
 
32
  self._respond_to_prompt_actor = RespondToPromptActor.remote()
33
 
34
  self._debug_queue = [
35
+ "hello, how are you today?",
36
  # "hmm, interesting, tell me more about that.",
37
  ]
38
  print("010")
 
55
  while True:
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
+ self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
59
  audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
respond_to_prompt_actor.py CHANGED
@@ -3,7 +3,7 @@ from ray.util.queue import Queue
3
  from dotenv import load_dotenv
4
  from audio_stream_processor import AudioStreamProcessor
5
  from streaming_chat_service import StreamingChatService
6
-
7
  # from ray.actor import ActorHandle
8
 
9
  @ray.remote
@@ -14,15 +14,25 @@ class PromptToLLMActor:
14
  self.output_queue = output_queue
15
  self.audio_processor = AudioStreamProcessor()
16
  self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
 
17
 
18
  async def run(self):
19
  while True:
20
- prompt = self.input_queue.get()
21
- async for sentence in self.chat_service.get_responses_as_sentances_async(prompt):
 
22
  if self.chat_service.ignore_sentence(sentence):
23
  continue
24
  print(f"{sentence}")
25
- self.output_queue.put(sentence)
 
 
 
 
 
 
 
 
26
 
27
  @ray.remote
28
  class LLMSentanceToSpeechActor:
@@ -32,12 +42,23 @@ class LLMSentanceToSpeechActor:
32
  self.output_queue = output_queue
33
  self.audio_processor = AudioStreamProcessor()
34
  self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
 
35
 
36
  async def run(self):
37
  while True:
38
- sentance = self.input_queue.get()
39
- async for chunk in self.chat_service.get_speech_chunks_async(sentance):
40
- self.output_queue.put(chunk)
 
 
 
 
 
 
 
 
 
 
41
 
42
  @ray.remote
43
  class SpeechToSpeakerActor:
@@ -49,8 +70,12 @@ class SpeechToSpeakerActor:
49
 
50
  async def run(self):
51
  while True:
52
- audio_chunk = self.input_queue.get()
53
  self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
 
 
 
 
54
 
55
  @ray.remote
56
  class RespondToPromptActor:
@@ -65,13 +90,20 @@ class RespondToPromptActor:
65
  self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
66
 
67
  # Start the pipeline components.
68
- print ("Starting pipeline components")
69
  self.prompt_to_llm.run.remote()
70
- print ("prompt_to_llm running")
71
  self.llm_sentence_to_speech.run.remote()
72
- print ("llm_sentence_to_speech running")
73
  self.speech_to_speaker.run.remote()
74
- print ("speech_to_speaker running")
75
 
76
  def enqueue_prompt(self, prompt):
77
- self.prompt_queue.put(prompt)
 
 
 
 
 
 
 
 
 
 
 
 
3
  from dotenv import load_dotenv
4
  from audio_stream_processor import AudioStreamProcessor
5
  from streaming_chat_service import StreamingChatService
6
+ import asyncio
7
  # from ray.actor import ActorHandle
8
 
9
  @ray.remote
 
14
  self.output_queue = output_queue
15
  self.audio_processor = AudioStreamProcessor()
16
  self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
17
+ self.cancel_event = None
18
 
19
  async def run(self):
20
  while True:
21
+ prompt = await self.input_queue.get_async()
22
+ self.cancel_event = asyncio.Event()
23
+ async for sentence in self.chat_service.get_responses_as_sentances_async(prompt, self.cancel_event):
24
  if self.chat_service.ignore_sentence(sentence):
25
  continue
26
  print(f"{sentence}")
27
+ await self.output_queue.put_async(sentence)
28
+
29
+ def cancel(self):
30
+ if self.cancel_event:
31
+ self.cancel_event.set()
32
+ while not self.input_queue.empty():
33
+ self.input_queue.get()
34
+ while not self.output_queue.empty():
35
+ self.output_queue.get()
36
 
37
  @ray.remote
38
  class LLMSentanceToSpeechActor:
 
42
  self.output_queue = output_queue
43
  self.audio_processor = AudioStreamProcessor()
44
  self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
45
+ self.cancel_event = None
46
 
47
  async def run(self):
48
  while True:
49
+ sentance = await self.input_queue.get_async()
50
+ self.cancel_event = asyncio.Event()
51
+ async for chunk in self.chat_service.get_speech_chunks_async(sentance, self.cancel_event):
52
+ await self.output_queue.put_async(chunk)
53
+
54
+ def cancel(self):
55
+ if self.cancel_event:
56
+ self.cancel_event.set()
57
+ while not self.input_queue.empty():
58
+ self.input_queue.get()
59
+ while not self.output_queue.empty():
60
+ self.output_queue.get()
61
+
62
 
63
  @ray.remote
64
  class SpeechToSpeakerActor:
 
70
 
71
  async def run(self):
72
  while True:
73
+ audio_chunk = await self.input_queue.get_async()
74
  self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
75
+
76
+ def cancel(self):
77
+ while not self.input_queue.empty():
78
+ self.input_queue.get()
79
 
80
  @ray.remote
81
  class RespondToPromptActor:
 
90
  self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
91
 
92
  # Start the pipeline components.
 
93
  self.prompt_to_llm.run.remote()
 
94
  self.llm_sentence_to_speech.run.remote()
 
95
  self.speech_to_speaker.run.remote()
 
96
 
97
  def enqueue_prompt(self, prompt):
98
+ print("flush anything queued")
99
+ prompt_to_llm_future = self.prompt_to_llm.cancel.remote()
100
+ llm_sentence_to_speech_future = self.llm_sentence_to_speech.cancel.remote()
101
+ speech_to_speaker_future = self.speech_to_speaker.cancel.remote()
102
+ ray.get([
103
+ prompt_to_llm_future,
104
+ llm_sentence_to_speech_future,
105
+ speech_to_speaker_future,
106
+ ])
107
+ self.prompt_queue.put(prompt)
108
+ print("Enqueued prompt")
109
+
streaming_chat_service.py CHANGED
@@ -144,7 +144,7 @@ I fell off the pink step, and I had an accident.
144
  self._messages.append({"role": "assistant", "content": agent_response})
145
  return agent_response
146
 
147
- async def get_responses_as_sentances_async(self, prompt):
148
  self._messages.append({"role": "user", "content": prompt})
149
  agent_response = ""
150
  current_sentence = ""
@@ -157,6 +157,8 @@ I fell off the pink step, and I had an accident.
157
  )
158
 
159
  async for chunk in response:
 
 
160
  chunk_message = chunk['choices'][0]['delta']
161
  if 'content' in chunk_message:
162
  chunk_text = chunk_message['content']
@@ -167,11 +169,13 @@ I fell off the pink step, and I had an accident.
167
  yield text_to_speak
168
  current_sentence = current_sentence[len(text_to_speak):]
169
 
 
 
170
  if len(current_sentence) > 0:
171
  yield current_sentence
172
  self._messages.append({"role": "assistant", "content": agent_response})
173
 
174
- async def get_speech_chunks_async(self, text_to_speak):
175
  stream = self._speech_service.stream(text_to_speak)
176
  stream, stream_backup = itertools.tee(stream)
177
  while True:
@@ -183,6 +187,8 @@ I fell off the pink step, and I had an accident.
183
 
184
  # Run next(stream) in a separate thread to avoid blocking the event loop
185
  chunk = await asyncio.to_thread(next, stream)
 
 
186
  yield chunk
187
 
188
  def enqueue_speech_bytes_to_play(self, speech_bytes):
 
144
  self._messages.append({"role": "assistant", "content": agent_response})
145
  return agent_response
146
 
147
+ async def get_responses_as_sentances_async(self, prompt, cancel_event):
148
  self._messages.append({"role": "user", "content": prompt})
149
  agent_response = ""
150
  current_sentence = ""
 
157
  )
158
 
159
  async for chunk in response:
160
+ if cancel_event.is_set():
161
+ return
162
  chunk_message = chunk['choices'][0]['delta']
163
  if 'content' in chunk_message:
164
  chunk_text = chunk_message['content']
 
169
  yield text_to_speak
170
  current_sentence = current_sentence[len(text_to_speak):]
171
 
172
+ if cancel_event.is_set():
173
+ return
174
  if len(current_sentence) > 0:
175
  yield current_sentence
176
  self._messages.append({"role": "assistant", "content": agent_response})
177
 
178
+ async def get_speech_chunks_async(self, text_to_speak, cancel_event):
179
  stream = self._speech_service.stream(text_to_speak)
180
  stream, stream_backup = itertools.tee(stream)
181
  while True:
 
187
 
188
  # Run next(stream) in a separate thread to avoid blocking the event loop
189
  chunk = await asyncio.to_thread(next, stream)
190
+ if cancel_event.is_set():
191
+ return
192
  yield chunk
193
 
194
  def enqueue_speech_bytes_to_play(self, speech_bytes):