import asyncio from collections import deque import os import sys import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List import subprocess import ray st.set_page_config(layout="wide") from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None @st.cache_resource def init_ray(): try: subprocess.check_output(["ray", "start", "--head"]) except Exception as e: print (f"app.py init_ray: {e}") # Connect to a running Ray cluster while not ray.is_initialized(): time.sleep(0.1) ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") @st.cache_resource def get_charles_actor(): charles_actor_instance = None charles_actor_proc = subprocess.Popen([sys.executable, "charles_actor.py"]) while charles_actor_instance == None: try: charles_actor_instance = ray.get_actor("CharlesActor") except ValueError as e: time.sleep(0.1) # give the subprocess a chance to start return charles_actor_instance @st.cache_resource def get_streamlit_av_queue(): from streamlit_av_queue import StreamlitAVQueue streamlit_av_queue_instance = StreamlitAVQueue() return streamlit_av_queue_instance async def main(): # Initialize Ray ray_status = init_ray() while not ray.is_initialized(): await asyncio.sleep(0.1) # get ray actors charles_actor = get_charles_actor() await asyncio.sleep(0.1) streamlit_av_queue = get_streamlit_av_queue() await asyncio.sleep(0.1) st.title("Project Charles") col1, col2 = st.columns(2) with col1: nested_col1, nested_col2, nested_col3 = st.columns(3) with nested_col1: listening = st.checkbox("Listen", value=True) with nested_col2: looking = st.checkbox("Look", value=False) charles_actor_debug_output = st.empty() environment_state_ouput = st.empty() # with nested_col3: # if st.button('Reboot Charles'): # st.write('Killing RAY...') # subprocess.check_output(["ray", "start", "--head"]) # st.write('Restarting RAY...') # init_ray() # charles_actor = None # st.write('Reboot Charles') with col2: playing = st.checkbox("Playing", value=True) webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, media_stream_constraints={ "video": True, "audio": { "sampleRate": 48000, "sampleSize": 16, "noiseSuppression": True, "echoCancellation": True, "channelCount": 1, } }, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) system_one_audio_status = st.markdown("Initializing streaming") if not webrtc_ctx.state.playing: exit try: while True: if not webrtc_ctx.state.playing: system_one_audio_status.write("Camera has stopped.") await asyncio.sleep(0.1) continue if charles_actor is None: system_one_audio_status.write("Looking for Charles actor...") charles_actor = get_charles_actor() if charles_actor is None: await asyncio.sleep(0.1) continue system_one_audio_status.write("Found Charles actor.") try: # new_environment_state = await charles_actor.get_environment_state.remote() # environment_state_ouput.markdown(f"{new_environment_state}") streamlit_av_queue.set_looking_listening(looking, listening) charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote() charles_actor_debug_output.markdown(charles_debug_str) state = await charles_actor.get_state.remote() system_one_audio_status.write(state) except Exception as e: # assume we disconnected charles_actor = None await asyncio.sleep(0.1) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None finally: pass