import asyncio from collections import deque import os import sys import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List import subprocess import ray st.set_page_config(layout="wide") from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None debug_charles_app = os.getenv('DEBUG_CHARLES_APP', 'false').lower() == 'true' charles_app_running = False @st.cache_resource def init_ray(): try: subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"]) except Exception as e: print (f"ui_app.py init_ray: {e}") # Connect to a running Ray cluster while not ray.is_initialized(): time.sleep(0.1) ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") @st.cache_resource def get_streamlit_av_queue(): from streamlit_av_queue import StreamlitAVQueue streamlit_av_queue_instance = StreamlitAVQueue() return streamlit_av_queue_instance @st.cache_resource def get_app_interface_instance(): from app_interface_actor import AppInterfaceActor app_interface_instance = AppInterfaceActor.get_singleton() return app_interface_instance @st.cache_resource def create_charles_app()->int: charles_app_proc = subprocess.Popen([sys.executable, "charles_app.py"]) return charles_app_proc.pid async def main(): # Initialize Ray ray_status = init_ray() # get ray actors app_interface_instance = get_app_interface_instance() await asyncio.sleep(0.1) streamlit_av_queue = get_streamlit_av_queue() await asyncio.sleep(0.1) st.title("Project Charles") col1, col2 = st.columns(2) with col1: nested_col1, nested_col2, nested_col3 = st.columns(3) with nested_col1: listening = st.checkbox("Listen", value=True) with nested_col2: looking = st.checkbox("Look", value=False) charles_actor_debug_output = st.empty() environment_state_ouput = st.empty() # with nested_col3: # if st.button('Reboot Charles'): # st.write('Killing RAY...') # subprocess.check_output(["ray", "start", "--head"]) # st.write('Restarting RAY...') # init_ray() # charles_actor = None # st.write('Reboot Charles') with col2: playing = st.checkbox("Playing", value=True) webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, media_stream_constraints={ "video": True, "audio": { "sampleRate": 48000, "sampleSize": 16, "noiseSuppression": True, "echoCancellation": True, "channelCount": 1, } }, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) system_one_audio_status = st.markdown("Initializing... may take some time (caching models, etc.)") if not webrtc_ctx.state.playing: exit try: while True: charles_app_running = await app_interface_instance.is_charles_app_running.remote() if not webrtc_ctx.state.playing: system_one_audio_status.write("Camera has stopped.") await asyncio.sleep(0.1) continue if not charles_app_running and debug_charles_app: system_one_audio_status.write("Start Charles app from your debugger...") await asyncio.sleep(0.1) continue if not charles_app_running: print("Starting Charles app...") system_one_audio_status.write("Starting Charles app...") chalres_app_pid = create_charles_app() print(f"Started Charles app with PID {chalres_app_pid}") await asyncio.sleep(.1) pids = await app_interface_instance.get_charles_app_pids.remote() assert chalres_app_pid in pids, f"Charles app PID {chalres_app_pid} not found in {pids}" assert len(pids) == 1, f"Expected 1 PID, found {len(pids)}" continue try: streamlit_av_queue.set_looking_listening(looking, listening) charles_debug_str = await app_interface_instance.get_debug_output.remote() charles_actor_debug_output.markdown(charles_debug_str) state = await app_interface_instance.get_state.remote() system_one_audio_status.write(state) except Exception as e: # assume we disconnected charles_actor = None await asyncio.sleep(0.01) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None finally: pass