import json import ray import time import asyncio import os from clip_transform import CLIPTransform from environment_state_actor import EnvironmentStateActor, EnvironmentState from agent_state_actor import AgentStateActor import asyncio import subprocess @ray.remote class CharlesActor: def __init__(self): self._needs_init = True self._charles_actor_debug_output = "" self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0) # Initialize as EnvironmentState self._state = "Initializing" self._clip_transform = CLIPTransform() def get_state(self): return self._state def get_charles_actor_debug_output(self): return self._charles_actor_debug_output def get_environment_state(self)->EnvironmentState: return self._environment_state async def _initalize_resources(self): # Initialize resources print("000 - create StreamlitAVQueue") self._state = "000 - creating StreamlitAVQueue" from streamlit_av_queue import StreamlitAVQueue self._streamlit_av_queue = StreamlitAVQueue() self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue() print("001 - create RespondToPromptActor") self._state = "001 - creating RespondToPromptActor" from respond_to_prompt_actor import RespondToPromptActor self._environment_state_actor = EnvironmentStateActor.remote() self._agent_state_actor = AgentStateActor.remote() self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._out_audio_queue) print("002 - create SpeechToTextVoskActor") self._state = "002 - creating SpeechToTextVoskActor" from speech_to_text_vosk_actor import SpeechToTextVoskActor self._speech_to_text_actor = SpeechToTextVoskActor.remote("small") # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big") self._debug_queue = [ # "hello, how are you today?", # "hmm, interesting, tell me more about that.", ] print("003 - create Prototypes") self._state = "003 - creating Prototypes" from prototypes import Prototypes self._prototypes = Prototypes() print("010") self._needs_init = True self._state = "Initialized" async def start(self): if self._needs_init: await self._initalize_resources() debug_output_history = [] def render_debug_output(list_of_strings): table_content = "##### Chat history\n" for item in reversed(list_of_strings): # table_content += f"\n```markdown\n{item}\n```\n" table_content += f"\n{item}\n" self._charles_actor_debug_output = table_content def add_debug_output(output): debug_output_history.append(output) if len(debug_output_history) > 10: debug_output_history.pop(0) render_debug_output(debug_output_history) self._state = "Waiting for input" total_video_frames = 0 skipped_video_frames = 0 total_audio_frames = 0 loops = 0 start_time = time.time() vector_debug = "--n/a--" process_speech_to_text_future = [] current_responses = [] speech_chunks_per_response = [] human_preview_text = "" robot_preview_text = "" while True: if len(self._debug_queue) > 0: prompt = self._debug_queue.pop(0) await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) env_state = await self._environment_state_actor.begin_next_step.remote() self._environment_state = env_state self._agent_state_actor.begin_step.remote() audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async() video_frames = await self._streamlit_av_queue.get_video_frames_async() if len(audio_frames) > 0: total_audio_frames += len(audio_frames) # Concatenate all audio frames into a single buffer audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) future = self._speech_to_text_actor.process_speech.remote(audio_buffer) process_speech_to_text_future.append(future) # audio_frames_task = None if len(video_frames) > 0: vector_debug = f"found {len(video_frames)} video frames" total_video_frames += 1 skipped_video_frames += (len(video_frames) -1) image_as_array = video_frames[-1] image_vector = self._clip_transform.image_to_embeddings(image_as_array) image_vector = image_vector[0] distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector) vector_debug = f"{closest_item_key} {distance_debug_str}" if len(process_speech_to_text_future) > 0: ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) if ready: prompt, speaker_finished, raw_json = await process_speech_to_text_future[0] del process_speech_to_text_future[0] prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"] if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore: print(f"Prompt: {prompt}") line = "" for i, response in enumerate(current_responses): line += "🤖 " if len(line) == 0 else "" # line += f"{response} [{speech_chunks_per_response[i]}] \n" line += f"[{speech_chunks_per_response[i]}] {response} \n" if len(line) > 0: add_debug_output(line) add_debug_output(f"👨 {prompt}") current_responses = [] speech_chunks_per_response = [] env_state.llm_preview = "" env_state.llm_responses = [] env_state.tts_raw_chunk_ids = [] human_preview_text = "" robot_preview_text = "" await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) elif len(prompt) > 0 and prompt not in prompts_to_ignore: human_preview_text = f"👨❓ {prompt}" for new_response in env_state.llm_responses: # add_debug_output(f"🤖 {new_response}") current_responses.append(new_response) speech_chunks_per_response.append(0) robot_preview_text = "" if len(env_state.llm_preview): robot_preview_text = f"🤖❓ {env_state.llm_preview}" for chunk in env_state.tts_raw_chunk_ids: chunk = json.loads(chunk) # prompt = chunk['prompt'] response_id = chunk['llm_sentence_id'] speech_chunks_per_response[response_id] += 1 list_of_strings = debug_output_history.copy() line = "" for i, response in enumerate(current_responses): line += "🤖 " if len(line) == 0 else "" line += f"[{speech_chunks_per_response[i]}] {response} \n" # line += f"{response} [{speech_chunks_per_response[i]}] \n" if len(robot_preview_text) > 0: line += robot_preview_text+" \n" list_of_strings.append(line) if len(human_preview_text) > 0: list_of_strings.append(human_preview_text) if len(list_of_strings) > 10: list_of_strings.pop(0) render_debug_output(list_of_strings) await asyncio.sleep(0.01) loops+=1 self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}" def init_ray(): try: subprocess.check_output(["ray", "start", "--head"]) except Exception as e: print (f"charles_actor.py init_ray: {e}") # Connect to a running Ray cluster while not ray.is_initialized(): time.sleep(0.1) ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") async def main(): if not ray.is_initialized(): init_ray() charles_actor = CharlesActor.options( name="CharlesActor", get_if_exists=True, ).remote() future = charles_actor.start.remote() last_step = -1 last_episode = -1 try: while True: ready, _ = ray.wait([future], timeout=0) if ready: # The start method has terminated. You can fetch the result (if any) with ray.get(). # If the method raised an exception, it will be re-raised here. try: result = ray.get(future) print(f"The start method has terminated with result: {result}") except Exception as e: print(f"The start method raised an exception: {e}") break else: # The start method is still running. You can poll for debug information here. await asyncio.sleep(1) state = await charles_actor.get_state.remote() env_state = await charles_actor.get_environment_state.remote() if (env_state.episode != last_episode) or (env_state.step != last_step): last_episode = env_state.episode last_step = env_state.step print(f"Charles is in state: {state}") # if len(env_state.llm_preview): # print (f"llm_preview: {env_state.llm_preview}") # if len(env_state.llm_responses): # print (f"llm_responses: {env_state.llm_responses}") # if len(env_state.tts_raw_chunk_ids): # for chunk_json in env_state.tts_raw_chunk_ids: # chunk = json.loads(chunk_json) # prompt = chunk['prompt'] # line = chunk['llm_sentence_id'] # chunk_id = chunk['chunk_count'] # print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}") except KeyboardInterrupt as e: print("Script was manually terminated") raise(e) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())