sohojoe commited on
Commit
bcea2ea
β€’
1 Parent(s): 98ec0ec

create respond to prompt actor

Browse files
charles_actor.py CHANGED
@@ -27,10 +27,10 @@ class CharlesActor:
27
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
28
  self._speech_to_text_actor = SpeechToTextVoskActor.remote()
29
 
30
- from chat_pipeline import ChatPipeline
31
- self._chat_pipeline = ChatPipeline()
32
- await self._chat_pipeline.start()
33
-
34
  self._debug_queue = [
35
  # "hello, how are you today?",
36
  # "hmm, interesting, tell me more about that.",
@@ -55,7 +55,7 @@ class CharlesActor:
55
  while True:
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
- await self._chat_pipeline.enqueue(prompt)
59
  audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
@@ -79,7 +79,7 @@ class CharlesActor:
79
  table_content = "| System 1 Audio History |\n| --- |\n"
80
  table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
81
  self._system_one_audio_history_output = table_content
82
- await self._chat_pipeline.enqueue(prompt)
83
 
84
  # video_frames = await self._streamlit_av_queue.get_video_frames_async()
85
  # if len(video_frames) > 0:
@@ -127,5 +127,6 @@ if __name__ == "__main__":
127
  time.sleep(1)
128
  state = charles_actor.get_state.remote()
129
  print(f"Charles is in state: {ray.get(state)}")
130
- except KeyboardInterrupt:
131
  print("Script was manually terminated")
 
 
27
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
28
  self._speech_to_text_actor = SpeechToTextVoskActor.remote()
29
 
30
+ print("003")
31
+ from respond_to_prompt_actor import RespondToPromptActor
32
+ self._respond_to_prompt_actor = RespondToPromptActor.remote()
33
+
34
  self._debug_queue = [
35
  # "hello, how are you today?",
36
  # "hmm, interesting, tell me more about that.",
 
55
  while True:
56
  if len(self._debug_queue) > 0:
57
  prompt = self._debug_queue.pop(0)
58
+ await self._respond_to_prompt_actor.enqueue(prompt)
59
  audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
60
  if len(audio_frames) > 0:
61
  total_audio_frames += len(audio_frames)
 
79
  table_content = "| System 1 Audio History |\n| --- |\n"
80
  table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
81
  self._system_one_audio_history_output = table_content
82
+ self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
83
 
84
  # video_frames = await self._streamlit_av_queue.get_video_frames_async()
85
  # if len(video_frames) > 0:
 
127
  time.sleep(1)
128
  state = charles_actor.get_state.remote()
129
  print(f"Charles is in state: {ray.get(state)}")
130
+ except KeyboardInterrupt as e:
131
  print("Script was manually terminated")
132
+ throw(e)
chat_pipeline.py β†’ legacy_to_delete/chat_pipeline.py RENAMED
File without changes
debug.py β†’ legacy_to_delete/debug.py RENAMED
File without changes
respond_to_prompt_actor.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import ray
2
+ from ray.util.queue import Queue
3
+ from dotenv import load_dotenv
4
+ from audio_stream_processor import AudioStreamProcessor
5
+ from streaming_chat_service import StreamingChatService
6
+
7
+ # from ray.actor import ActorHandle
8
+
9
+ @ray.remote
10
+ class PromptToLLMActor:
11
+ def __init__(self, input_queue, output_queue, voice_id):
12
+ load_dotenv()
13
+ self.input_queue = input_queue
14
+ self.output_queue = output_queue
15
+ self.audio_processor = AudioStreamProcessor()
16
+ self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
17
+
18
+ async def run(self):
19
+ while True:
20
+ prompt = self.input_queue.get()
21
+ async for sentence in self.chat_service.get_responses_as_sentances_async(prompt):
22
+ if self.chat_service.ignore_sentence(sentence):
23
+ continue
24
+ print(f"{sentence}")
25
+ self.output_queue.put(sentence)
26
+
27
+ @ray.remote
28
+ class LLMSentanceToSpeechActor:
29
+ def __init__(self, input_queue, output_queue, voice_id):
30
+ load_dotenv()
31
+ self.input_queue = input_queue
32
+ self.output_queue = output_queue
33
+ self.audio_processor = AudioStreamProcessor()
34
+ self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
35
+
36
+ async def run(self):
37
+ while True:
38
+ sentance = self.input_queue.get()
39
+ async for chunk in self.chat_service.get_speech_chunks_async(sentance):
40
+ self.output_queue.put(chunk)
41
+
42
+ @ray.remote
43
+ class SpeechToSpeakerActor:
44
+ def __init__(self, input_queue, voice_id):
45
+ load_dotenv()
46
+ self.input_queue = input_queue
47
+ self.audio_processor = AudioStreamProcessor()
48
+ self.chat_service = StreamingChatService(self.audio_processor, voice_id=voice_id)
49
+
50
+ async def run(self):
51
+ while True:
52
+ audio_chunk = self.input_queue.get()
53
+ self.chat_service.enqueue_speech_bytes_to_play([audio_chunk])
54
+
55
+ @ray.remote
56
+ class RespondToPromptActor:
57
+ def __init__(self):
58
+ voice_id="2OviOUQc1JsQRQgNkVBj"
59
+ self.prompt_queue = Queue(maxsize=100)
60
+ self.llm_sentence_queue = Queue(maxsize=100)
61
+ self.speech_chunk_queue = Queue(maxsize=100)
62
+
63
+ self.prompt_to_llm = PromptToLLMActor.remote(self.prompt_queue, self.llm_sentence_queue, voice_id)
64
+ self.llm_sentence_to_speech = LLMSentanceToSpeechActor.remote(self.llm_sentence_queue, self.speech_chunk_queue, voice_id)
65
+ self.speech_to_speaker = SpeechToSpeakerActor.remote(self.speech_chunk_queue, voice_id)
66
+
67
+ # Start the pipeline components.
68
+ print ("Starting pipeline components")
69
+ self.prompt_to_llm.run.remote()
70
+ print ("prompt_to_llm running")
71
+ self.llm_sentence_to_speech.run.remote()
72
+ print ("llm_sentence_to_speech running")
73
+ self.speech_to_speaker.run.remote()
74
+ print ("speech_to_speaker running")
75
+
76
+ def enqueue_prompt(self, prompt):
77
+ self.prompt_queue.put(prompt)