project_charles / charles_app.py
sohojoe's picture
refactor - move ResponseState to responce_state_manager
90a9891
raw
history blame
10.8 kB
import json
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from response_state_manager import ResponseStateManager
from respond_to_prompt_async import RespondToPromptAsync
import asyncio
import subprocess
class CharlesApp:
def __init__(self):
self._needs_init = True
self._charles_actor_debug_output = ""
self._state = "Initializing"
self._clip_transform = CLIPTransform()
def set_state(self, state, skip_print=False):
self._state = state
if not skip_print:
print(state)
# check if self._app_interface_actor exists
if hasattr(self, '_app_interface_actor'):
self._app_interface_actor.set_state.remote(self._state)
async def _initalize_resources(self):
# Initialize resources
self.set_state("001 - creating AppInterfaceActor")
from app_interface_actor import AppInterfaceActor
self._app_interface_actor = AppInterfaceActor.get_singleton()
self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()
self.set_state("002 - creating ResponseStateManager")
self._response_state_manager = ResponseStateManager()
self.set_state("003 - creating PromptManager")
from prompt_manager import PromptManager
self._prompt_manager = PromptManager()
self.set_state("004 - creating RespondToPromptAsync")
self._respond_to_prompt = None
self._respond_to_prompt_task = None
self.set_state("005 - create SpeechToTextVoskActor")
from speech_to_text_vosk_actor import SpeechToTextVoskActor
self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
# self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
self.set_state("006 - create Prototypes")
from prototypes import Prototypes
self._prototypes = Prototypes()
self.set_state("007 - create animator")
from charles_animator import CharlesAnimator
self._animator = CharlesAnimator()
self._needs_init = True
self.set_state("010 - Initialized")
async def start(self):
if self._needs_init:
await self._initalize_resources()
debug_output_history = []
async def render_debug_output(list_of_strings):
table_content = "##### Chat history\n"
for item in reversed(list_of_strings):
# table_content += f"\n```markdown\n{item}\n```\n"
table_content += f"\n{item}\n"
self._charles_actor_debug_output = table_content
await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output)
async def add_debug_output(output):
debug_output_history.append(output)
if len(debug_output_history) > 10:
debug_output_history.pop(0)
await render_debug_output(debug_output_history)
self.set_state("Waiting for input")
total_video_frames = 0
skipped_video_frames = 0
total_audio_frames = 0
loops = 0
start_time = time.time()
vector_debug = "--n/a--"
process_speech_to_text_future = []
human_preview_text = ""
robot_preview_text = ""
additional_prompt = None
previous_prompt = ""
is_talking = False
has_spoken_for_this_prompt = False
while True:
response_step_obs, response_state = self._response_state_manager.begin_next_step()
audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()
video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote()
if len(audio_frames) > 0:
total_audio_frames += len(audio_frames)
# Concatenate all audio frames into a single buffer
audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
process_speech_to_text_future.append(future)
# audio_frames_task = None
if len(video_frames) > 0:
vector_debug = f"found {len(video_frames)} video frames"
total_video_frames += 1
skipped_video_frames += (len(video_frames) -1)
image_as_array = video_frames[-1]
image_vector = self._clip_transform.image_to_embeddings(image_as_array)
image_vector = image_vector[0]
distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
vector_debug = f"{closest_item_key} {distance_debug_str}"
if len(process_speech_to_text_future) > 0:
ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
if ready:
prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
del process_speech_to_text_future[0]
prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]
if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
print(f"Prompt: {prompt}")
line = ""
for i, response in enumerate(response_state.current_responses):
line += "πŸ€– " if len(line) == 0 else ""
line += f"[{response_state.speech_chunks_per_response[i]}] {response} \n"
if len(line) > 0:
await add_debug_output(line)
human_preview_text = ""
robot_preview_text = ""
if additional_prompt is not None:
prompt = additional_prompt + ". " + prompt
await add_debug_output(f"πŸ‘¨ {prompt}")
self._prompt_manager.replace_or_append_user_message(prompt)
if self._respond_to_prompt_task is not None:
await self._respond_to_prompt.terminate()
self._respond_to_prompt_task.cancel()
self._respond_to_prompt = RespondToPromptAsync(self._response_state_manager, self._audio_output_queue)
self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages))
additional_prompt = None
previous_prompt = prompt
is_talking = False
has_spoken_for_this_prompt = False
response_step_obs, response_state = self._response_state_manager.reset_episode()
elif len(prompt) > 0 and prompt not in prompts_to_ignore:
# sometimes we get a false signal of speaker_finsihed
# in which case we get new prompts before we have spoken
if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
additional_prompt = previous_prompt
has_spoken_for_this_prompt = True
if self._respond_to_prompt_task is not None:
await self._respond_to_prompt.terminate()
self._respond_to_prompt_task.cancel()
self._respond_to_prompt_task = None
self._respond_to_prompt = None
response_step_obs, response_state = self._response_state_manager.reset_episode()
if additional_prompt is not None:
prompt = additional_prompt + ". " + prompt
human_preview_text = f"πŸ‘¨β“ {prompt}"
for new_response in response_step_obs.llm_responses:
# add_debug_output(f"πŸ€– {new_response}")
self._prompt_manager.append_assistant_message(new_response)
robot_preview_text = ""
if len(response_step_obs.llm_preview):
robot_preview_text = f"πŸ€–β“ {response_step_obs.llm_preview}"
list_of_strings = debug_output_history.copy()
line = ""
for i, response in enumerate(response_state.current_responses):
line += "πŸ€– " if len(line) == 0 else ""
line += f"[{response_state.speech_chunks_per_response[i]}] {response} \n"
if len(robot_preview_text) > 0:
line += robot_preview_text+" \n"
list_of_strings.append(line)
if len(human_preview_text) > 0:
list_of_strings.append(human_preview_text)
if len(list_of_strings) > 10:
list_of_strings.pop(0)
await render_debug_output(list_of_strings)
await asyncio.sleep(0.001)
# add observations to the environment state
count = len(self._audio_output_queue)
is_talking = bool(count > 0)
has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
frame = self._animator.update(is_talking)
frame_ref = ray.put(frame)
await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref)
loops+=1
self.set_state(
f"Processed {total_video_frames} video frames \
and {total_audio_frames} audio frames, \
loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \
Is speaking: {is_talking}({count}). \
{vector_debug}\
", skip_print=True)
def init_ray():
try:
subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"])
except Exception as e:
print (f"charles_actor.py init_ray: {e}")
# Connect to a running Ray cluster
while not ray.is_initialized():
time.sleep(0.1)
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
async def main():
if not ray.is_initialized():
init_ray()
charles_actor = CharlesApp()
await charles_actor.start()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())