import json import ray import time import asyncio import os from clip_transform import CLIPTransform from response_state_manager import ResponseStateManager from respond_to_prompt_async import RespondToPromptAsync import asyncio import subprocess import pid_helper class CharlesApp: def __init__(self): self._needs_init = True self._charles_actor_debug_output = "" self._state = "Initializing" self._clip_transform = CLIPTransform() def set_state(self, state, skip_print=False): self._state = state if not skip_print: print(state) # check if self._app_interface_actor exists if hasattr(self, '_app_interface_actor'): self._app_interface_actor.set_state.remote(self._state) async def _initalize_resources(self): # Initialize resources self.set_state("001 - creating AppInterfaceActor") from app_interface_actor import AppInterfaceActor self._app_interface_actor = AppInterfaceActor.get_singleton() await self._app_interface_actor.add_charles_app_pid.remote(pid_helper.get_current_pid()) self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote() self.set_state("002 - creating ResponseStateManager") self._response_state_manager = ResponseStateManager() self.set_state("003 - creating PromptManager") from prompt_manager import PromptManager self._prompt_manager = PromptManager() self.set_state("004 - creating RespondToPromptAsync") self._respond_to_prompt = None self._respond_to_prompt_task = None self.set_state("005 - create SpeechToTextVoskActor") from speech_to_text_vosk_actor import SpeechToTextVoskActor self._speech_to_text_actor = SpeechToTextVoskActor.remote("small") # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big") self.set_state("006 - create Prototypes") from prototypes import Prototypes self._prototypes = Prototypes() self.set_state("007 - create animator") from charles_animator import CharlesAnimator self._animator = CharlesAnimator() self._needs_init = True self.set_state("010 - Initialized") async def cancel_response_task(self): if self._respond_to_prompt_task is None: return await self._respond_to_prompt.terminate() self._respond_to_prompt_task.cancel() self._respond_to_prompt_task = None self._respond_to_prompt = None async def start(self): if self._needs_init: await self._initalize_resources() debug_output_history = [] async def render_debug_output(list_of_strings): table_content = "##### Chat history\n" for item in reversed(list_of_strings): # table_content += f"\n```markdown\n{item}\n```\n" table_content += f"\n{item}\n" self._charles_actor_debug_output = table_content await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output) async def add_debug_output(output): debug_output_history.append(output) if len(debug_output_history) > 10: debug_output_history.pop(0) await render_debug_output(debug_output_history) self.set_state("Waiting for input") total_video_frames = 0 skipped_video_frames = 0 total_audio_frames = 0 loops = 0 start_time = time.time() vector_debug = "--n/a--" process_speech_to_text_future = [] human_preview_text = "" additional_prompt = None previous_prompt = "" is_talking = False has_spoken_for_this_prompt = False while True: response_step_obs, response_state = self._response_state_manager.begin_next_step() audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote() video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote() if len(audio_frames) > 0: total_audio_frames += len(audio_frames) # Concatenate all audio frames into a single buffer audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) future = self._speech_to_text_actor.process_speech.remote(audio_buffer) process_speech_to_text_future.append(future) # audio_frames_task = None if len(video_frames) > 0: vector_debug = f"found {len(video_frames)} video frames" total_video_frames += 1 skipped_video_frames += (len(video_frames) -1) image_as_array = video_frames[-1] image_vector = self._clip_transform.image_to_embeddings(image_as_array) image_vector = image_vector[0] distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector) vector_debug = f"{closest_item_key} {distance_debug_str}" if len(process_speech_to_text_future) > 0: ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) if ready: prompt, speaker_finished, raw_json = await process_speech_to_text_future[0] del process_speech_to_text_future[0] prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"] if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore: print(f"Prompt: {prompt}") response_preview_text = self._response_state_manager.pretty_print_current_responses() if len(response_preview_text) > 0: await add_debug_output(response_preview_text) human_preview_text = "" if additional_prompt is not None: prompt = additional_prompt + ". " + prompt await add_debug_output(f"👨 {prompt}") self._prompt_manager.replace_or_append_user_message(prompt) await self.cancel_response_task() self._respond_to_prompt = RespondToPromptAsync(self._response_state_manager, self._audio_output_queue) self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages)) additional_prompt = None previous_prompt = prompt is_talking = False has_spoken_for_this_prompt = False response_step_obs, response_state = self._response_state_manager.reset_episode() elif len(prompt) > 0 and prompt not in prompts_to_ignore: # sometimes we get a false signal of speaker_finsihed # in which case we get new prompts before we have spoken if len(previous_prompt) > 0 and not has_spoken_for_this_prompt: additional_prompt = previous_prompt has_spoken_for_this_prompt = True await self.cancel_response_task() response_step_obs, response_state = self._response_state_manager.reset_episode() if additional_prompt is not None: prompt = additional_prompt + ". " + prompt human_preview_text = f"👨❓ {prompt}" # await self.cancel_response_task() # TODO re-enable to interupt when user speaks # i choose to add each line of responce one at a time as them come in for new_response in response_step_obs.llm_responses: self._prompt_manager.append_assistant_message(new_response) list_of_strings = debug_output_history.copy() robot_preview_text = self._response_state_manager.pretty_print_preview_text() response_preview_text = self._response_state_manager.pretty_print_current_responses() if len(robot_preview_text) > 0: response_preview_text += robot_preview_text+" \n" list_of_strings.append(response_preview_text) if len(human_preview_text) > 0: list_of_strings.append(human_preview_text) if len(list_of_strings) > 10: list_of_strings.pop(0) await render_debug_output(list_of_strings) await asyncio.sleep(0.001) # add observations to the environment state count = len(self._audio_output_queue) is_talking = bool(count > 0) has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking frame = self._animator.update(is_talking) frame_ref = ray.put(frame) await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref) loops+=1 self.set_state( f"Processed {total_video_frames} video frames \ and {total_audio_frames} audio frames, \ loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \ Is speaking: {is_talking}({count}). \ {vector_debug}\ ", skip_print=True) def init_ray(): try: subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"]) except Exception as e: print (f"charles_actor.py init_ray: {e}") # Connect to a running Ray cluster while not ray.is_initialized(): time.sleep(0.1) ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") async def main(): if not ray.is_initialized(): init_ray() charles_actor = CharlesApp() await charles_actor.start() if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())