project_charles / charles_actor.py
sohojoe's picture
fix: should be raise, now throw
dc249ac
raw
history blame
5.41 kB
import ray
import time
import asyncio
import os
@ray.remote
class CharlesActor:
def __init__(self):
self._needs_init = True
self._system_one_audio_history_output = ""
self._state = "Initializing"
def get_state(self):
return self._state
def get_system_one_audio_history_output(self):
return self._system_one_audio_history_output
async def _initalize_resources(self):
# Initialize resources
print("000")
from streamlit_av_queue import StreamlitAVQueue
self._streamlit_av_queue = StreamlitAVQueue()
print("002")
from speech_to_text_vosk_actor import SpeechToTextVoskActor
self._speech_to_text_actor = SpeechToTextVoskActor.remote()
print("003")
from respond_to_prompt_actor import RespondToPromptActor
self._respond_to_prompt_actor = RespondToPromptActor.remote()
self._debug_queue = [
# "hello, how are you today?",
# "hmm, interesting, tell me more about that.",
]
print("010")
self._needs_init = True
self._state = "Initialized"
async def start(self):
if self._needs_init:
await self._initalize_resources()
system_one_audio_history = []
self._state = "Waiting for input"
total_video_frames = 0
total_audio_frames = 0
loops = 0
process_speech_to_text_future = []
while True:
if len(self._debug_queue) > 0:
prompt = self._debug_queue.pop(0)
self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
audio_frames = await self._streamlit_av_queue.get_audio_frames_async()
if len(audio_frames) > 0:
total_audio_frames += len(audio_frames)
# Concatenate all audio frames into a single buffer
audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
process_speech_to_text_future.append(future)
# audio_frames_task = None
if len(process_speech_to_text_future) > 0:
ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
if ready:
prompt, speaker_finished = ray.get(process_speech_to_text_future[0])
del process_speech_to_text_future[0]
if speaker_finished and len(prompt) > 0:
print(f"Prompt: {prompt}")
system_one_audio_history.append(prompt)
if len(system_one_audio_history) > 10:
system_one_audio_history = system_one_audio_history[-10:]
table_content = "| System 1 Audio History |\n| --- |\n"
table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
self._system_one_audio_history_output = table_content
self._respond_to_prompt_actor.enqueue_prompt.remote(prompt)
# video_frames = await self._streamlit_av_queue.get_video_frames_async()
# if len(video_frames) > 0:
# total_video_frames += len(video_frames)
# # for video_frame in video_frames:
# # system_one_video_output.image(video_frame.to_ndarray())
# # pass
# update debug output
if (total_video_frames >0 or total_audio_frames > 0):
self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames"
await asyncio.sleep(0.01)
loops+=1
self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}"
if __name__ == "__main__":
if not ray.is_initialized():
# Try to connect to a running Ray cluster
ray_address = os.getenv('RAY_ADDRESS')
if ray_address:
ray.init(ray_address, namespace="project_charles")
else:
ray.init(namespace="project_charles")
charles_actor = CharlesActor.options(
name="CharlesActor",
get_if_exists=True,
).remote()
future = charles_actor.start.remote()
try:
while True:
ready, _ = ray.wait([future], timeout=0)
if ready:
# The start method has terminated. You can fetch the result (if any) with ray.get().
# If the method raised an exception, it will be re-raised here.
try:
result = ray.get(future)
print(f"The start method has terminated with result: {result}")
except Exception as e:
print(f"The start method raised an exception: {e}")
break
else:
# The start method is still running. You can poll for debug information here.
time.sleep(1)
state = charles_actor.get_state.remote()
print(f"Charles is in state: {ray.get(state)}")
except KeyboardInterrupt as e:
print("Script was manually terminated")
raise(e)