import asyncio from collections import deque import os import threading import time import traceback import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List st.set_page_config(layout="wide") from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() webrtc_ctx = None # Initialize Ray import ray if not ray.is_initialized(): # Try to connect to a running Ray cluster ray_address = os.getenv('RAY_ADDRESS') if ray_address: ray.init(ray_address, namespace="project_charles") else: ray.init(namespace="project_charles") async def main(): system_one_audio_status = st.empty() system_one_audio_status.write("Initializing streaming") # system_one_audio_output = st.empty() # system_one_video_output = st.empty() # system_one_audio_history = [] col1, col2 = st.columns(2) with col1: listening = st.checkbox("Listen", value=True) system_one_audio_history_output = st.empty() # Initialize resources if not already done system_one_audio_status.write("Initializing streaming") if "streamlit_av_queue" not in st.session_state: from streamlit_av_queue import StreamlitAVQueue st.session_state.streamlit_av_queue = StreamlitAVQueue() system_one_audio_status.write("resources referecned") system_one_audio_status.write("Initializing webrtc_streamer") with col2: playing = st.checkbox("Playing", value=True) webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, queued_audio_frames_callback=st.session_state.streamlit_av_queue.queued_audio_frames_callback, queued_video_frames_callback=st.session_state.streamlit_av_queue.queued_video_frames_callback, mode=WebRtcMode.SENDRECV, media_stream_constraints={ "video": True, "audio": { "sampleRate": 48000, "sampleSize": 16, "noiseSuppression": True, "echoCancellation": True, "channelCount": 1, } }, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) if not webrtc_ctx.state.playing: exit system_one_audio_status.write("Initializing speech") from charles_actor import CharlesActor charles_actor = None try: while True: if "streamlit_av_queue" in st.session_state: st.session_state.streamlit_av_queue.set_listening(listening) if not webrtc_ctx.state.playing: system_one_audio_status.write("Stopped.") await asyncio.sleep(0.1) continue if charles_actor is None: try: charles_actor = ray.get_actor("CharlesActor") system_one_audio_status.write("Charles is here.") except ValueError as e: system_one_audio_status.write("Charles is sleeping.") pass if charles_actor is not None: try: audio_history = await charles_actor.get_system_one_audio_history_output.remote() system_one_audio_history_output.markdown(audio_history) except Exception as e: # assume we disconnected charles_actor = None await asyncio.sleep(0.1) except Exception as e: print(f"An error occurred: {e}") traceback.print_exc() raise e if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: if webrtc_ctx is not None: del webrtc_ctx webrtc_ctx = None if "streamlit_av_queue" in st.session_state: del st.session_state.streamlit_av_queue finally: pass