import asyncio from collections import deque import os import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List st.set_page_config(layout="wide") from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None # Initialize Ray import ray if not ray.is_initialized(): import subprocess try: subprocess.check_output(["ray", "start", "--head"]) except Exception as e: print (e) # Try to connect to a running Ray cluster ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") import platform print (f"platform:{platform.system()}") if platform.system() == 'Linux': charles_actor_proc = subprocess.Popen(["python3", "charles_actor.py"]) async def main(): st.title("Project Charles") col1, col2 = st.columns(2) with col1: nested_col1, nested_col2, nested_col3 = st.columns(3) with nested_col1: listening = st.checkbox("Listen", value=True) with nested_col2: looking = st.checkbox("Look", value=False) charles_actor_debug_output = st.empty() environment_state_ouput = st.empty() with nested_col3: if st.button('Reboot Server'): st.write('Rebooting...') subprocess.run(["sudo", "reboot"]) with col2: if "streamlit_av_queue" not in st.session_state: from streamlit_av_queue import StreamlitAVQueue st.session_state.streamlit_av_queue = StreamlitAVQueue() playing = st.checkbox("Playing", value=True) webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=st.session_state.streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=st.session_state.streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, media_stream_constraints={ "video": True, "audio": { "sampleRate": 48000, "sampleSize": 16, "noiseSuppression": True, "echoCancellation": True, "channelCount": 1, } }, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) system_one_audio_status = st.markdown("Initializing streaming") if not webrtc_ctx.state.playing: exit from charles_actor import CharlesActor charles_actor = None try: while True: if "streamlit_av_queue" in st.session_state: st.session_state.streamlit_av_queue.set_looking_listening(looking, listening) if not webrtc_ctx.state.playing: system_one_audio_status.write("Stopped.") await asyncio.sleep(0.1) continue if charles_actor is None: try: charles_actor = ray.get_actor("CharlesActor") system_one_audio_status.write("Charles is here.") except ValueError as e: system_one_audio_status.write("Charles is sleeping.") pass if charles_actor is not None: try: # new_environment_state = await charles_actor.get_environment_state.remote() # environment_state_ouput.markdown(f"{new_environment_state}") charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote() charles_actor_debug_output.markdown(charles_debug_str) except Exception as e: # assume we disconnected charles_actor = None await asyncio.sleep(0.1) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None if "streamlit_av_queue" in st.session_state: del st.session_state.streamlit_av_queue finally: pass