import asyncio from collections import deque import os import sys import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List import subprocess import ray st.set_page_config(layout="wide") from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None @st.cache_resource def init_ray(): try: subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"]) except Exception as e: print (f"ui_app.py init_ray: {e}") # Connect to a running Ray cluster while not ray.is_initialized(): time.sleep(0.1) ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") # @st.cache_resource # def get_charles_actor(): # charles_actor_instance = None # charles_actor_proc = subprocess.Popen([sys.executable, "charles_actor.py"]) # while charles_actor_instance == None: # try: # charles_actor_instance = ray.get_actor("CharlesApp") # except ValueError as e: # time.sleep(0.1) # give the subprocess a chance to start # return charles_actor_instance @st.cache_resource def get_streamlit_av_queue(): from streamlit_av_queue import StreamlitAVQueue streamlit_av_queue_instance = StreamlitAVQueue() return streamlit_av_queue_instance @st.cache_resource def get_app_interface_instance(): from app_interface_actor import AppInterfaceActor app_interface_instance = AppInterfaceActor.get_singleton() return app_interface_instance async def main(): # Initialize Ray ray_status = init_ray() while not ray.is_initialized(): await asyncio.sleep(0.1) # get ray actors app_interface_instance = get_app_interface_instance() await asyncio.sleep(0.1) streamlit_av_queue = get_streamlit_av_queue() await asyncio.sleep(0.1) st.title("Project Charles") col1, col2 = st.columns(2) with col1: nested_col1, nested_col2, nested_col3 = st.columns(3) with nested_col1: listening = st.checkbox("Listen", value=True) with nested_col2: looking = st.checkbox("Look", value=False) charles_actor_debug_output = st.empty() environment_state_ouput = st.empty() # with nested_col3: # if st.button('Reboot Charles'): # st.write('Killing RAY...') # subprocess.check_output(["ray", "start", "--head"]) # st.write('Restarting RAY...') # init_ray() # charles_actor = None # st.write('Reboot Charles') with col2: playing = st.checkbox("Playing", value=True) webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, media_stream_constraints={ "video": True, "audio": { "sampleRate": 48000, "sampleSize": 16, "noiseSuppression": True, "echoCancellation": True, "channelCount": 1, } }, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) system_one_audio_status = st.markdown("Initializing... may take some time (caching models, etc.)") if not webrtc_ctx.state.playing: exit try: while True: if not webrtc_ctx.state.playing: system_one_audio_status.write("Camera has stopped.") await asyncio.sleep(0.1) continue # if charles_actor is None: # system_one_audio_status.write("Looking for Charles actor...") # charles_actor = get_charles_actor() # if charles_actor is None: # await asyncio.sleep(0.1) # continue # system_one_audio_status.write("Found Charles actor.") try: streamlit_av_queue.set_looking_listening(looking, listening) charles_debug_str = await app_interface_instance.get_debug_output.remote() charles_actor_debug_output.markdown(charles_debug_str) state = await app_interface_instance.get_state.remote() system_one_audio_status.write(state) except Exception as e: # assume we disconnected charles_actor = None await asyncio.sleep(0.1) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None finally: pass