import streamlit as st import os, glob, re, base64, asyncio, requests from datetime import datetime from collections import defaultdict from urllib.parse import quote from xml.etree import ElementTree as ET import edge_tts import streamlit.components.v1 as components # -------------------- Configuration -------------------- USER_NAMES = [ "Aria", "Guy", "Sonia", "Tony", "Jenny", "Davis", "Libby", "Clara", "Liam", "Natasha", "William" ] ENGLISH_VOICES = [ "en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural", "en-GB-TonyNeural", "en-US-JennyNeural", "en-US-DavisNeural", "en-GB-LibbyNeural", "en-CA-ClaraNeural", "en-CA-LiamNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural" ] USER_VOICES = dict(zip(USER_NAMES, ENGLISH_VOICES)) SAVED_INPUTS_DIR = "saved_inputs" os.makedirs(SAVED_INPUTS_DIR, exist_ok=True) if 'user_name' not in st.session_state: st.session_state['user_name'] = USER_NAMES[0] if 'old_val' not in st.session_state: st.session_state['old_val'] = None if 'should_rerun' not in st.session_state: st.session_state['should_rerun'] = False if 'viewing_prefix' not in st.session_state: st.session_state['viewing_prefix'] = None def clean_for_speech(text: str) -> str: text = text.replace("\n", " ") text = text.replace("", " ") text = text.replace("#", "") text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) text = re.sub(r"\s+", " ", text).strip() return text async def edge_tts_generate_audio(text, voice="en-US-AriaNeural"): text = clean_for_speech(text) if not text.strip(): return None communicate = edge_tts.Communicate(text, voice) out_fn = f"speech_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" try: await communicate.save(out_fn) except edge_tts.exceptions.NoAudioReceived: st.error("No audio received from TTS service.") return None return out_fn def speak_with_edge_tts(text, voice="en-US-AriaNeural"): return asyncio.run(edge_tts_generate_audio(text, voice)) def play_and_download_audio(file_path): if file_path and os.path.exists(file_path): st.audio(file_path) dl_link = f'Download {os.path.basename(file_path)}' st.markdown(dl_link, unsafe_allow_html=True) def save_input_as_md(user_name, text, prefix="input"): if not text.strip(): return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_text = re.sub(r'[^\w\s-]', '', text[:50]).strip().lower() safe_text = re.sub(r'[-\s]+', '-', safe_text) fn = f"{prefix}_{timestamp}_{safe_text}.md" full_path = os.path.join(SAVED_INPUTS_DIR, fn) with open(full_path, 'w', encoding='utf-8') as f: f.write(f"# User: {user_name}\n") f.write(f"**Timestamp:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(text) return full_path def list_saved_inputs(): files = sorted(glob.glob(os.path.join(SAVED_INPUTS_DIR, "*.md"))) return files def parse_md_file(fpath): user_line = "" ts_line = "" content_lines = [] with open(fpath, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: if line.startswith("# User:"): user_line = line.replace("# User:", "").strip() elif line.startswith("**Timestamp:**"): ts_line = line.replace("**Timestamp:**", "").strip() else: content_lines.append(line.strip()) content = "\n".join(content_lines).strip() return user_line, ts_line, content def arxiv_search(query, max_results=3): base_url = "http://export.arxiv.org/api/query" params = { 'search_query': query.replace(' ', '+'), 'start': 0, 'max_results': max_results } response = requests.get(base_url, params=params, timeout=30) if response.status_code == 200: root = ET.fromstring(response.text) ns = {"a": "http://www.w3.org/2005/Atom"} entries = root.findall('a:entry', ns) results = [] for entry in entries: title = entry.find('a:title', ns).text.strip() summary = entry.find('a:summary', ns).text.strip() # Extract links (PDF) if available links = entry.findall('a:link', ns) pdf_link = None for link in links: if link.get('type') == 'application/pdf': pdf_link = link.get('href') summary_short = summary[:300] + "..." # Include PDF link and title if pdf_link: formatted = f"Title: {title}\nPDF: {pdf_link}\nSummary: {summary_short}" else: formatted = f"Title: {title}\n(No PDF link)\nSummary: {summary_short}" results.append(formatted) return results return [] def summarize_arxiv_results(results): if not results: return "No results found." return "\n\n".join(results) def concatenate_mp3(files, output_file): with open(output_file, 'wb') as outfile: for f in files: with open(f, 'rb') as infile: outfile.write(infile.read()) st.title("🎙️ Voice Chat & ArXiv Search") with st.sidebar: st.session_state['user_name'] = st.selectbox("Current User:", USER_NAMES, index=0) saved_files = list_saved_inputs() st.write("📝 Saved Inputs:") for fpath in saved_files: user, ts, content = parse_md_file(fpath) fname = os.path.basename(fpath) st.write(f"- {fname} (User: {user})") if st.button("🗑️ Clear All History"): for fpath in saved_files: os.remove(fpath) st.session_state['viewing_prefix'] = None st.success("All history cleared!") st.experimental_rerun() mycomponent = components.declare_component("mycomponent", path="mycomponent") voice_val = mycomponent(my_input_value="Start speaking...") tabs = st.tabs(["🎤 Voice Chat", "💾 History", "⚙️ Settings"]) # ------------------ Voice Chat Tab ------------------------- with tabs[0]: st.subheader("🎤 Voice Chat") if voice_val: voice_text = voice_val.strip() input_changed = (voice_text != st.session_state.get('old_val')) if input_changed and voice_text: # 1. Save user input save_input_as_md(st.session_state['user_name'], voice_text, prefix="input") # 2. Perform ArXiv search automatically with st.spinner("Searching ArXiv..."): results = arxiv_search(voice_text) summary = summarize_arxiv_results(results) # Save as response save_input_as_md(st.session_state['user_name'], summary, prefix="arxiv") st.write(summary) # 3. Convert summary to audio and auto-play voice = USER_VOICES.get(st.session_state['user_name'], "en-US-AriaNeural") audio_file = speak_with_edge_tts(summary, voice=voice) if audio_file: play_and_download_audio(audio_file) # 4. Update old_val to avoid repeated searches for same input st.session_state['old_val'] = voice_text # 5. Clear displayed text and re-run so next utterance starts fresh st.experimental_rerun() st.write("Speak a query to automatically run an ArXiv search and read results aloud.") # ------------------ History Tab ------------------------- with tabs[1]: st.subheader("💾 History") files = list_saved_inputs() conversation = [] for fpath in files: user, ts, content = parse_md_file(fpath) conversation.append((user, ts, content, fpath)) for i, (user, ts, content, fpath) in enumerate(reversed(conversation), start=1): with st.expander(f"{ts} - {user}", expanded=False): st.write(content) if st.button(f"🔊 Read Aloud {ts}-{user}", key=f"read_{i}_{fpath}"): voice = USER_VOICES.get(user, "en-US-AriaNeural") audio_file = speak_with_edge_tts(content, voice=voice) if audio_file: play_and_download_audio(audio_file) if st.button("📜 Read Entire Conversation"): conversation_chrono = list(reversed(conversation)) mp3_files = [] for user, ts, content, fpath in conversation_chrono: voice = USER_VOICES.get(user, "en-US-AriaNeural") audio_file = speak_with_edge_tts(content, voice=voice) if audio_file: mp3_files.append(audio_file) st.write(f"**{user} ({ts}):**") play_and_download_audio(audio_file) if mp3_files: combined_file = f"full_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" concatenate_mp3(mp3_files, combined_file) st.write("**Full Conversation Audio:**") play_and_download_audio(combined_file) # ------------------ Settings Tab ------------------------- with tabs[2]: st.subheader("⚙️ Settings") st.write("Currently no additional settings. Use the sidebar to pick a user.") if st.session_state.should_rerun: st.session_state.should_rerun = False st.rerun()