project_charles / charles_actor.py
sohojoe's picture
refactor: use more of a MDP style structure
149eeaf
raw
history blame
9.3 kB
import json
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from environment_state_actor import EnvironmentStateActor, EnvironmentState
from agent_state_actor import AgentStateActor
import asyncio
@ray.remote
class CharlesActor:
def __init__(self):
self._needs_init = True
self._charles_actor_debug_output = ""
self._environment_state:EnvironmentState = EnvironmentState(episode=0, step=0) # Initialize as EnvironmentState
self._state = "Initializing"
self._clip_transform = CLIPTransform()
def get_state(self):
return self._state
def get_charles_actor_debug_output(self):
return self._charles_actor_debug_output
def get_environment_state(self)->EnvironmentState:
return self._environment_state
async def _initalize_resources(self):
# Initialize resources
print("000 - create StreamlitAVQueue")
from streamlit_av_queue import StreamlitAVQueue
self._streamlit_av_queue = StreamlitAVQueue()
self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue()
print("001 - create RespondToPromptActor")
from respond_to_prompt_actor import RespondToPromptActor
self._environment_state_actor = EnvironmentStateActor.remote()
self._agent_state_actor = AgentStateActor.remote()
self._respond_to_prompt_actor = RespondToPromptActor.remote(self._environment_state_actor, self._out_audio_queue)
print("002 - create SpeechToTextVoskActor")
from speech_to_text_vosk_actor import SpeechToTextVoskActor
self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
# self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")
self._debug_queue = [
# "hello, how are you today?",
# "hmm, interesting, tell me more about that.",
]
print("003 - create Prototypes")
from prototypes import Prototypes
self._prototypes = Prototypes()
print("010")
self._needs_init = True
self._state = "Initialized"
async def start(self):
if self._needs_init:
await self._initalize_resources()
debug_output_history = []
def add_debug_output(output):
debug_output_history.append(output)
if len(debug_output_history) > 10:
debug_output_history.pop(0)
table_content = "| Charles Actor debug history |\n| --- |\n"
table_content += "\n".join([f"| {item} |" for item in reversed(debug_output_history)])
self._charles_actor_debug_output = table_content
def preview_debug_output(output):
table_content = "| Charles Actor debug history |\n| --- |\n"
debug_output_history_copy = debug_output_history.copy()
debug_output_history_copy.append(output)
table_content += "\n".join([f"| {item} |" for item in reversed(debug_output_history_copy)])
self._charles_actor_debug_output = table_content
self._state = "Waiting for input"
total_video_frames = 0
skipped_video_frames = 0
total_audio_frames = 0
loops = 0
start_time = time.time()
vector_debug = "--n/a--"
process_speech_to_text_future = []
while True:
if len(self._debug_queue) > 0:
prompt = self._debug_queue.pop(0)
await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
env_state = await self._environment_state_actor.begin_next_step.remote()
self._environment_state = env_state
self._agent_state_actor.begin_step.remote()
audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async()
video_frames = await self._streamlit_av_queue.get_video_frames_async()
if len(audio_frames) > 0:
total_audio_frames += len(audio_frames)
# Concatenate all audio frames into a single buffer
audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
process_speech_to_text_future.append(future)
# audio_frames_task = None
if len(video_frames) > 0:
vector_debug = f"found {len(video_frames)} video frames"
total_video_frames += 1
skipped_video_frames += (len(video_frames) -1)
image_as_array = video_frames[-1]
image_vector = self._clip_transform.image_to_embeddings(image_as_array)
image_vector = image_vector[0]
distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
vector_debug = f"{closest_item_key} {distance_debug_str}"
human_preview_text = ""
robot_preview_text = ""
if len(process_speech_to_text_future) > 0:
ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
if ready:
prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
del process_speech_to_text_future[0]
prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]
if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
print(f"Prompt: {prompt}")
# system_one_audio_history.append("... " + str(raw_json))
add_debug_output(f"πŸ‘¨ {prompt}")
await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
elif len(prompt) > 0 and prompt not in prompts_to_ignore:
human_preview_text = f"πŸ‘¨β“ {prompt}"
for new_response in env_state.llm_responses:
add_debug_output(f"πŸ€– {new_response}")
if len(env_state.llm_preview):
robot_preview_text = f"πŸ€–β“ {env_state.llm_preview}"
if len(human_preview_text) > 0:
preview_debug_output(human_preview_text)
elif len(robot_preview_text) > 0:
preview_debug_output(robot_preview_text)
await asyncio.sleep(0.01)
loops+=1
self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}"
async def main():
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
charles_actor = CharlesActor.options(
name="CharlesActor",
get_if_exists=True,
).remote()
future = charles_actor.start.remote()
last_step = -1
last_episode = -1
try:
while True:
ready, _ = ray.wait([future], timeout=0)
if ready:
# The start method has terminated. You can fetch the result (if any) with ray.get().
# If the method raised an exception, it will be re-raised here.
try:
result = ray.get(future)
print(f"The start method has terminated with result: {result}")
except Exception as e:
print(f"The start method raised an exception: {e}")
break
else:
# The start method is still running. You can poll for debug information here.
await asyncio.sleep(1)
state = await charles_actor.get_state.remote()
env_state = await charles_actor.get_environment_state.remote()
if (env_state.episode != last_episode) or (env_state.step != last_step):
last_episode = env_state.episode
last_step = env_state.step
print(f"Charles is in state: {state}")
# if len(env_state.llm_preview):
# print (f"llm_preview: {env_state.llm_preview}")
# if len(env_state.llm_responses):
# print (f"llm_responses: {env_state.llm_responses}")
# if len(env_state.tts_raw_chunk_ids):
# for chunk_json in env_state.tts_raw_chunk_ids:
# chunk = json.loads(chunk_json)
# prompt = chunk['prompt']
# line = chunk['llm_sentence_id']
# chunk_id = chunk['chunk_count']
# print(f"Prompt: {prompt}, Line: {line}, Chunk: {chunk_id}")
except KeyboardInterrupt as e:
print("Script was manually terminated")
raise(e)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())