Spaces:
Sleeping
Sleeping
import ray | |
import time | |
import asyncio | |
import os | |
class CharlesActor: | |
def __init__(self): | |
self._needs_init = True | |
self._system_one_audio_history_output = "" | |
self._state = "Initializing" | |
def get_state(self): | |
return self._state | |
def get_system_one_audio_history_output(self): | |
return self._system_one_audio_history_output | |
async def _initalize_resources(self): | |
# Initialize resources | |
print("000") | |
from streamlit_av_queue import StreamlitAVQueue | |
self._streamlit_av_queue = StreamlitAVQueue() | |
self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue() | |
print("001") | |
from ffmpeg_converter_actor import FFMpegConverterActor | |
self._ffmpeg_converter_actor = FFMpegConverterActor.remote(self._out_audio_queue) | |
await self._ffmpeg_converter_actor.start_process.remote() | |
self._ffmpeg_converter_actor.run.remote() | |
print("002") | |
from speech_to_text_vosk_actor import SpeechToTextVoskActor | |
self._speech_to_text_actor = SpeechToTextVoskActor.remote() | |
print("003") | |
from respond_to_prompt_actor import RespondToPromptActor | |
self._respond_to_prompt_actor = RespondToPromptActor.remote(self._ffmpeg_converter_actor) | |
self._debug_queue = [ | |
# "hello, how are you today?", | |
# "hmm, interesting, tell me more about that.", | |
] | |
print("010") | |
self._needs_init = True | |
self._state = "Initialized" | |
async def start(self): | |
if self._needs_init: | |
await self._initalize_resources() | |
system_one_audio_history = [] | |
self._state = "Waiting for input" | |
total_video_frames = 0 | |
total_audio_frames = 0 | |
loops = 0 | |
process_speech_to_text_future = [] | |
while True: | |
if len(self._debug_queue) > 0: | |
prompt = self._debug_queue.pop(0) | |
await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) | |
audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async() | |
if len(audio_frames) > 0: | |
total_audio_frames += len(audio_frames) | |
# Concatenate all audio frames into a single buffer | |
audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) | |
future = self._speech_to_text_actor.process_speech.remote(audio_buffer) | |
process_speech_to_text_future.append(future) | |
# audio_frames_task = None | |
if len(process_speech_to_text_future) > 0: | |
ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) | |
if ready: | |
prompt, speaker_finished = await process_speech_to_text_future[0] | |
del process_speech_to_text_future[0] | |
if speaker_finished and len(prompt) > 0: | |
print(f"Prompt: {prompt}") | |
system_one_audio_history.append(prompt) | |
if len(system_one_audio_history) > 10: | |
system_one_audio_history = system_one_audio_history[-10:] | |
table_content = "| System 1 Audio History |\n| --- |\n" | |
table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) | |
self._system_one_audio_history_output = table_content | |
await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) | |
# video_frames = await self._streamlit_av_queue.get_video_frames_async() | |
# if len(video_frames) > 0: | |
# total_video_frames += len(video_frames) | |
# # for video_frame in video_frames: | |
# # system_one_video_output.image(video_frame.to_ndarray()) | |
# # pass | |
# update debug output | |
if (total_video_frames >0 or total_audio_frames > 0): | |
self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames" | |
await asyncio.sleep(0.01) | |
loops+=1 | |
self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}" | |
if __name__ == "__main__": | |
if not ray.is_initialized(): | |
# Try to connect to a running Ray cluster | |
ray_address = os.getenv('RAY_ADDRESS') | |
if ray_address: | |
ray.init(ray_address, namespace="project_charles") | |
else: | |
ray.init(namespace="project_charles") | |
charles_actor = CharlesActor.options( | |
name="CharlesActor", | |
get_if_exists=True, | |
).remote() | |
future = charles_actor.start.remote() | |
try: | |
while True: | |
ready, _ = ray.wait([future], timeout=0) | |
if ready: | |
# The start method has terminated. You can fetch the result (if any) with ray.get(). | |
# If the method raised an exception, it will be re-raised here. | |
try: | |
result = ray.get(future) | |
print(f"The start method has terminated with result: {result}") | |
except Exception as e: | |
print(f"The start method raised an exception: {e}") | |
break | |
else: | |
# The start method is still running. You can poll for debug information here. | |
time.sleep(1) | |
state = charles_actor.get_state.remote() | |
print(f"Charles is in state: {ray.get(state)}") | |
except KeyboardInterrupt as e: | |
print("Script was manually terminated") | |
raise(e) | |