sohojoe commited on
Commit
2bb91de
1 Parent(s): 361f9d4

create instance per prompt;

Browse files
app_interface_actor.py CHANGED
@@ -10,7 +10,7 @@ class AppInterfaceActor:
10
  def __init__(self):
11
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
- self.audio_output_queue = Queue(maxsize=3000) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
  self.debug_str = ""
16
  self.state = "Initializing"
 
10
  def __init__(self):
11
  self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed
12
  self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed
13
+ self.audio_output_queue = Queue(maxsize=50) # Adjust the size as needed
14
  self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed
15
  self.debug_str = ""
16
  self.state = "Initializing"
charles_actor.py CHANGED
@@ -5,6 +5,7 @@ import asyncio
5
  import os
6
  from clip_transform import CLIPTransform
7
  from environment_state_actor import EnvironmentStateActor, EnvironmentState
 
8
  import asyncio
9
  import subprocess
10
 
@@ -34,7 +35,6 @@ class CharlesActor:
34
  from app_interface_actor import AppInterfaceActor
35
  self._app_interface_actor = AppInterfaceActor.get_singleton()
36
  self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
37
- await self._app_interface_actor.set_state.remote(self._state)
38
 
39
  self.set_state("002 - creating EnvironmentStateActor")
40
  self._environment_state_actor = EnvironmentStateActor.remote()
@@ -44,35 +44,24 @@ class CharlesActor:
44
  self._prompt_manager = PromptManager()
45
 
46
  self.set_state("004 - creating RespondToPromptAsync")
47
- await self._app_interface_actor.set_state.remote(self._state)
48
- from respond_to_prompt_async import RespondToPromptAsync
49
- self._respond_to_prompt = RespondToPromptAsync(self._environment_state_actor, self._audio_output_queue)
50
- self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run())
51
 
52
  self.set_state("005 - create SpeechToTextVoskActor")
53
- await self._app_interface_actor.set_state.remote(self._state)
54
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
55
  self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
56
  # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
57
-
58
- self._debug_queue = [
59
- # "hello, how are you today?",
60
- # "hmm, interesting, tell me more about that.",
61
- ]
62
 
63
  self.set_state("006 - create Prototypes")
64
- await self._app_interface_actor.set_state.remote(self._state)
65
  from prototypes import Prototypes
66
  self._prototypes = Prototypes()
67
 
68
  self.set_state("007 - create animator")
69
- await self._app_interface_actor.set_state.remote(self._state)
70
  from charles_animator import CharlesAnimator
71
  self._animator = CharlesAnimator()
72
 
73
  self._needs_init = True
74
  self.set_state("010 - Initialized")
75
- await self._app_interface_actor.set_state.remote(self._state)
76
 
77
  async def start(self):
78
  if self._needs_init:
@@ -95,7 +84,6 @@ class CharlesActor:
95
  await render_debug_output(debug_output_history)
96
 
97
  self.set_state("Waiting for input")
98
- await self._app_interface_actor.set_state.remote(self._state)
99
  total_video_frames = 0
100
  skipped_video_frames = 0
101
  total_audio_frames = 0
@@ -113,13 +101,7 @@ class CharlesActor:
113
  is_talking = False
114
  has_spoken_for_this_prompt = False
115
 
116
-
117
- while True:
118
- if len(self._debug_queue) > 0:
119
- prompt = self._debug_queue.pop(0)
120
- self._prompt_manager.append_user_message(prompt)
121
- await self._respond_to_prompt.enqueue_prompt(prompt, self._prompt_manager.messages)
122
-
123
  env_state = await self._environment_state_actor.begin_next_step.remote()
124
  self._environment_state = env_state
125
  audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()
@@ -171,7 +153,11 @@ class CharlesActor:
171
  prompt = additional_prompt + ". " + prompt
172
  await add_debug_output(f"👨 {prompt}")
173
  self._prompt_manager.append_user_message(prompt)
174
- await self._respond_to_prompt.enqueue_prompt(prompt, self._prompt_manager.messages)
 
 
 
 
175
  additional_prompt = None
176
  previous_prompt = prompt
177
  is_talking = False
@@ -182,7 +168,11 @@ class CharlesActor:
182
  if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
183
  additional_prompt = previous_prompt
184
  has_spoken_for_this_prompt = True
185
- await self._respond_to_prompt.enqueue_prompt("", self._prompt_manager.messages)
 
 
 
 
186
  if additional_prompt is not None:
187
  prompt = additional_prompt + ". " + prompt
188
  human_preview_text = f"👨❓ {prompt}"
@@ -218,7 +208,7 @@ class CharlesActor:
218
  await render_debug_output(list_of_strings)
219
 
220
 
221
- await asyncio.sleep(0.01)
222
 
223
  # add observations to the environment state
224
  count = len(self._audio_output_queue)
@@ -236,7 +226,6 @@ class CharlesActor:
236
  Is speaking: {is_talking}({count}). \
237
  {vector_debug}\
238
  ", skip_print=True)
239
- await self._app_interface_actor.set_state.remote(self._state)
240
 
241
  def init_ray():
242
  try:
 
5
  import os
6
  from clip_transform import CLIPTransform
7
  from environment_state_actor import EnvironmentStateActor, EnvironmentState
8
+ from respond_to_prompt_async import RespondToPromptAsync
9
  import asyncio
10
  import subprocess
11
 
 
35
  from app_interface_actor import AppInterfaceActor
36
  self._app_interface_actor = AppInterfaceActor.get_singleton()
37
  self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
 
38
 
39
  self.set_state("002 - creating EnvironmentStateActor")
40
  self._environment_state_actor = EnvironmentStateActor.remote()
 
44
  self._prompt_manager = PromptManager()
45
 
46
  self.set_state("004 - creating RespondToPromptAsync")
47
+ self._respond_to_prompt = None
48
+ self._respond_to_prompt_task = None
 
 
49
 
50
  self.set_state("005 - create SpeechToTextVoskActor")
 
51
  from speech_to_text_vosk_actor import SpeechToTextVoskActor
52
  self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
53
  # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
 
 
 
 
 
54
 
55
  self.set_state("006 - create Prototypes")
 
56
  from prototypes import Prototypes
57
  self._prototypes = Prototypes()
58
 
59
  self.set_state("007 - create animator")
 
60
  from charles_animator import CharlesAnimator
61
  self._animator = CharlesAnimator()
62
 
63
  self._needs_init = True
64
  self.set_state("010 - Initialized")
 
65
 
66
  async def start(self):
67
  if self._needs_init:
 
84
  await render_debug_output(debug_output_history)
85
 
86
  self.set_state("Waiting for input")
 
87
  total_video_frames = 0
88
  skipped_video_frames = 0
89
  total_audio_frames = 0
 
101
  is_talking = False
102
  has_spoken_for_this_prompt = False
103
 
104
+ while True:
 
 
 
 
 
 
105
  env_state = await self._environment_state_actor.begin_next_step.remote()
106
  self._environment_state = env_state
107
  audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()
 
153
  prompt = additional_prompt + ". " + prompt
154
  await add_debug_output(f"👨 {prompt}")
155
  self._prompt_manager.append_user_message(prompt)
156
+ if self._respond_to_prompt_task is not None:
157
+ await self._respond_to_prompt.terminate()
158
+ self._respond_to_prompt_task.cancel()
159
+ self._respond_to_prompt = RespondToPromptAsync(self._environment_state_actor, self._audio_output_queue)
160
+ self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages))
161
  additional_prompt = None
162
  previous_prompt = prompt
163
  is_talking = False
 
168
  if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
169
  additional_prompt = previous_prompt
170
  has_spoken_for_this_prompt = True
171
+ if self._respond_to_prompt_task is not None:
172
+ await self._respond_to_prompt.terminate()
173
+ self._respond_to_prompt_task.cancel()
174
+ self._respond_to_prompt_task = None
175
+ self._respond_to_prompt = None
176
  if additional_prompt is not None:
177
  prompt = additional_prompt + ". " + prompt
178
  human_preview_text = f"👨❓ {prompt}"
 
208
  await render_debug_output(list_of_strings)
209
 
210
 
211
+ await asyncio.sleep(0.001)
212
 
213
  # add observations to the environment state
214
  count = len(self._audio_output_queue)
 
226
  Is speaking: {is_talking}({count}). \
227
  {vector_debug}\
228
  ", skip_print=True)
 
229
 
230
  def init_ray():
231
  try:
ffmpeg_converter_actor.py CHANGED
@@ -3,21 +3,27 @@ import asyncio
3
  import ray
4
  from ray.util.queue import Queue
5
 
6
- @ray.remote
7
  class FFMpegConverterActor:
8
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
9
  self.output_queue = output_queue
10
  self.buffer_size = buffer_size
11
  self.output_format = output_format
12
-
13
  self.input_pipe = None
14
  self.output_pipe = None
15
-
16
  self.process = None
 
17
 
18
  async def run(self):
19
- while True:
20
- chunk = await self.output_pipe.readexactly(self.buffer_size)
 
 
 
 
 
 
 
 
21
  # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
22
  chunk_ref = ray.put(chunk)
23
  await self.output_queue.put_async(chunk_ref)
@@ -55,14 +61,11 @@ class FFMpegConverterActor:
55
  self.input_pipe.write(chunk)
56
  await self.input_pipe.drain()
57
 
58
- # def has_processed_all_data(self):
59
- # return self.process.poll() is not None
60
-
61
- async def flush_output_queue(self):
62
- while not self.output_queue.empty():
63
- await self.output_queue.get_async()
64
-
65
- def close(self):
66
- self.input_pipe.close()
67
- self.output_pipe.close()
68
- self.process.wait()
 
3
  import ray
4
  from ray.util.queue import Queue
5
 
 
6
  class FFMpegConverterActor:
7
  def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
8
  self.output_queue = output_queue
9
  self.buffer_size = buffer_size
10
  self.output_format = output_format
 
11
  self.input_pipe = None
12
  self.output_pipe = None
 
13
  self.process = None
14
+ self.running = True
15
 
16
  async def run(self):
17
+ while self.running:
18
+ try:
19
+ chunk = await self.output_pipe.readexactly(self.buffer_size)
20
+ except asyncio.IncompleteReadError:
21
+ # exit if we have finsihsed the process
22
+ if self.running == False:
23
+ return
24
+ # If the pipe is broken, restart the process.
25
+ await self.start_process()
26
+ continue
27
  # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
28
  chunk_ref = ray.put(chunk)
29
  await self.output_queue.put_async(chunk_ref)
 
61
  self.input_pipe.write(chunk)
62
  await self.input_pipe.drain()
63
 
64
+ async def close(self):
65
+ self.running = False # Stop the loop inside run()
66
+ if self.process:
67
+ self.process.stdin.transport.close()
68
+ self.process.kill()
69
+ self.process.terminate()
70
+ # while not self.output_queue.empty():
71
+ # await self.output_queue.get_async()
 
 
 
respond_to_prompt_async.py CHANGED
@@ -10,7 +10,6 @@ from environment_state_actor import EnvironmentStateActor
10
  from ffmpeg_converter_actor import FFMpegConverterActor
11
  from agent_response import AgentResponse
12
  import json
13
- from asyncio import Semaphore
14
 
15
  class RespondToPromptAsync:
16
  def __init__(
@@ -18,47 +17,38 @@ class RespondToPromptAsync:
18
  environment_state_actor:EnvironmentStateActor,
19
  audio_output_queue):
20
  voice_id="2OviOUQc1JsQRQgNkVBj"
21
- self.prompt_queue = Queue(maxsize=100)
22
  self.llm_sentence_queue = Queue(maxsize=100)
23
  self.speech_chunk_queue = Queue(maxsize=100)
24
  self.voice_id = voice_id
25
  self.audio_output_queue = audio_output_queue
26
  self.environment_state_actor = environment_state_actor
27
- self.processing_semaphore = Semaphore(1)
28
  self.sentence_queues = []
29
  self.sentence_tasks = []
30
  # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
31
 
32
- async def enqueue_prompt(self, prompt:str, messages:[str]):
33
- if len(prompt) > 0: # handles case where we just want to flush
34
- await self.prompt_queue.put((prompt, messages))
35
- print("Enqueued prompt")
36
-
37
- async def prompt_to_llm(self):
38
  chat_service = ChatService()
39
 
40
  async with TaskGroup() as tg:
41
- while True:
42
- prompt, messages = await self.prompt_queue.get()
43
- agent_response = AgentResponse(prompt)
44
- async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(messages):
45
- if chat_service.ignore_sentence(text):
46
- is_complete_sentance = False
47
- if not is_complete_sentance:
48
- agent_response['llm_preview'] = text
49
- await self.environment_state_actor.set_llm_preview.remote(text)
50
- continue
51
- agent_response['llm_preview'] = ''
52
- agent_response['llm_sentence'] = text
53
- agent_response['llm_sentences'].append(text)
54
- await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
55
- print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
56
- sentence_response = agent_response.make_copy()
57
- new_queue = Queue()
58
- self.sentence_queues.append(new_queue)
59
- task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue))
60
- self.sentence_tasks.append(task)
61
- agent_response['llm_sentence_id'] += 1
62
 
63
 
64
  async def llm_sentence_to_speech(self, sentence_response, output_queue):
@@ -79,9 +69,9 @@ class RespondToPromptAsync:
79
  chunk_count += 1
80
 
81
  async def speech_to_converter(self):
82
- self.ffmpeg_converter_actor = FFMpegConverterActor.remote(self.audio_output_queue)
83
- await self.ffmpeg_converter_actor.start_process.remote()
84
- self.ffmpeg_converter_actor.run.remote()
85
 
86
  while True:
87
  for i, task in enumerate(self.sentence_tasks):
@@ -93,12 +83,38 @@ class RespondToPromptAsync:
93
  chunk_response = await queue.get()
94
  audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
95
  audio_chunk = ray.get(audio_chunk_ref)
96
- await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
97
  break
98
 
99
  await asyncio.sleep(0.01)
100
 
101
- async def run(self):
 
102
  async with TaskGroup() as tg: # Use asyncio's built-in TaskGroup
103
- tg.create_task(self.prompt_to_llm())
104
- tg.create_task(self.speech_to_converter())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  from ffmpeg_converter_actor import FFMpegConverterActor
11
  from agent_response import AgentResponse
12
  import json
 
13
 
14
  class RespondToPromptAsync:
15
  def __init__(
 
17
  environment_state_actor:EnvironmentStateActor,
18
  audio_output_queue):
19
  voice_id="2OviOUQc1JsQRQgNkVBj"
 
20
  self.llm_sentence_queue = Queue(maxsize=100)
21
  self.speech_chunk_queue = Queue(maxsize=100)
22
  self.voice_id = voice_id
23
  self.audio_output_queue = audio_output_queue
24
  self.environment_state_actor = environment_state_actor
 
25
  self.sentence_queues = []
26
  self.sentence_tasks = []
27
  # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)
28
 
29
+ async def prompt_to_llm(self, prompt:str, messages:[str]):
 
 
 
 
 
30
  chat_service = ChatService()
31
 
32
  async with TaskGroup() as tg:
33
+ agent_response = AgentResponse(prompt)
34
+ async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(messages):
35
+ if chat_service.ignore_sentence(text):
36
+ is_complete_sentance = False
37
+ if not is_complete_sentance:
38
+ agent_response['llm_preview'] = text
39
+ await self.environment_state_actor.set_llm_preview.remote(text)
40
+ continue
41
+ agent_response['llm_preview'] = ''
42
+ agent_response['llm_sentence'] = text
43
+ agent_response['llm_sentences'].append(text)
44
+ await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
45
+ print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
46
+ sentence_response = agent_response.make_copy()
47
+ new_queue = Queue()
48
+ self.sentence_queues.append(new_queue)
49
+ task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue))
50
+ self.sentence_tasks.append(task)
51
+ agent_response['llm_sentence_id'] += 1
 
 
52
 
53
 
54
  async def llm_sentence_to_speech(self, sentence_response, output_queue):
 
69
  chunk_count += 1
70
 
71
  async def speech_to_converter(self):
72
+ self.ffmpeg_converter_actor = FFMpegConverterActor(self.audio_output_queue)
73
+ await self.ffmpeg_converter_actor.start_process()
74
+ self.ffmpeg_converter_actor_task = asyncio.create_task(self.ffmpeg_converter_actor.run())
75
 
76
  while True:
77
  for i, task in enumerate(self.sentence_tasks):
 
83
  chunk_response = await queue.get()
84
  audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
85
  audio_chunk = ray.get(audio_chunk_ref)
86
+ await self.ffmpeg_converter_actor.push_chunk(audio_chunk)
87
  break
88
 
89
  await asyncio.sleep(0.01)
90
 
91
+ async def run(self, prompt:str, messages:[str]):
92
+ self.task_group_tasks = []
93
  async with TaskGroup() as tg: # Use asyncio's built-in TaskGroup
94
+ t1 = tg.create_task(self.prompt_to_llm(prompt, messages))
95
+ t2 = tg.create_task(self.speech_to_converter())
96
+ self.task_group_tasks.extend([t1, t2])
97
+
98
+ async def terminate(self):
99
+ # Cancel tasks
100
+ if self.task_group_tasks:
101
+ for task in self.task_group_tasks:
102
+ task.cancel()
103
+ for task in self.sentence_tasks:
104
+ task.cancel()
105
+
106
+ # Close FFmpeg converter actor
107
+ if self.ffmpeg_converter_actor_task:
108
+ self.ffmpeg_converter_actor_task.cancel()
109
+ await self.ffmpeg_converter_actor.close()
110
+ # ray.kill(self.ffmpeg_converter_actor)
111
+
112
+ # Flush all queues
113
+ while not self.llm_sentence_queue.empty():
114
+ await self.llm_sentence_queue.get()
115
+ while not self.speech_chunk_queue.empty():
116
+ await self.speech_chunk_queue.get()
117
+ for sentence_queue in self.sentence_queues:
118
+ while not sentence_queue.empty():
119
+ await sentence_queue.get()
120
+