Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os, glob, re, base64, asyncio, requests | |
from datetime import datetime | |
from collections import defaultdict | |
from urllib.parse import quote | |
from xml.etree import ElementTree as ET | |
import edge_tts | |
import streamlit.components.v1 as components | |
# -------------------- Configuration -------------------- | |
USER_NAMES = [ | |
"Aria", "Guy", "Sonia", "Tony", "Jenny", "Davis", "Libby", "Clara", "Liam", "Natasha", "William" | |
] | |
ENGLISH_VOICES = [ | |
"en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural", "en-GB-TonyNeural", | |
"en-US-JennyNeural", "en-US-DavisNeural", "en-GB-LibbyNeural", "en-CA-ClaraNeural", | |
"en-CA-LiamNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural" | |
] | |
USER_VOICES = dict(zip(USER_NAMES, ENGLISH_VOICES)) | |
SAVED_INPUTS_DIR = "saved_inputs" | |
os.makedirs(SAVED_INPUTS_DIR, exist_ok=True) | |
if 'user_name' not in st.session_state: | |
st.session_state['user_name'] = USER_NAMES[0] | |
if 'old_val' not in st.session_state: | |
st.session_state['old_val'] = None | |
if 'should_rerun' not in st.session_state: | |
st.session_state['should_rerun'] = False | |
if 'viewing_prefix' not in st.session_state: | |
st.session_state['viewing_prefix'] = None | |
def clean_for_speech(text: str) -> str: | |
text = text.replace("\n", " ") | |
text = text.replace("</s>", " ") | |
text = text.replace("#", "") | |
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural"): | |
text = clean_for_speech(text) | |
if not text.strip(): | |
return None | |
communicate = edge_tts.Communicate(text, voice) | |
out_fn = f"speech_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
try: | |
await communicate.save(out_fn) | |
except edge_tts.exceptions.NoAudioReceived: | |
st.error("No audio received from TTS service.") | |
return None | |
return out_fn | |
def speak_with_edge_tts(text, voice="en-US-AriaNeural"): | |
return asyncio.run(edge_tts_generate_audio(text, voice)) | |
def play_and_download_audio(file_path): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
dl_link = f'<a href="data:audio/mpeg;base64,{base64.b64encode(open(file_path,"rb").read()).decode()}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>' | |
st.markdown(dl_link, unsafe_allow_html=True) | |
def save_input_as_md(user_name, text, prefix="input"): | |
if not text.strip(): | |
return | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
safe_text = re.sub(r'[^\w\s-]', '', text[:50]).strip().lower() | |
safe_text = re.sub(r'[-\s]+', '-', safe_text) | |
fn = f"{prefix}_{timestamp}_{safe_text}.md" | |
full_path = os.path.join(SAVED_INPUTS_DIR, fn) | |
with open(full_path, 'w', encoding='utf-8') as f: | |
f.write(f"# User: {user_name}\n") | |
f.write(f"**Timestamp:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") | |
f.write(text) | |
return full_path | |
def list_saved_inputs(): | |
files = sorted(glob.glob(os.path.join(SAVED_INPUTS_DIR, "*.md"))) | |
return files | |
def parse_md_file(fpath): | |
user_line = "" | |
ts_line = "" | |
content_lines = [] | |
with open(fpath, 'r', encoding='utf-8') as f: | |
lines = f.readlines() | |
for line in lines: | |
if line.startswith("# User:"): | |
user_line = line.replace("# User:", "").strip() | |
elif line.startswith("**Timestamp:**"): | |
ts_line = line.replace("**Timestamp:**", "").strip() | |
else: | |
content_lines.append(line.strip()) | |
content = "\n".join(content_lines).strip() | |
return user_line, ts_line, content | |
def arxiv_search(query, max_results=3): | |
base_url = "http://export.arxiv.org/api/query" | |
params = { | |
'search_query': query.replace(' ', '+'), | |
'start': 0, | |
'max_results': max_results | |
} | |
response = requests.get(base_url, params=params, timeout=30) | |
if response.status_code == 200: | |
root = ET.fromstring(response.text) | |
ns = {"a": "http://www.w3.org/2005/Atom"} | |
entries = root.findall('a:entry', ns) | |
results = [] | |
for entry in entries: | |
title = entry.find('a:title', ns).text.strip() | |
summary = entry.find('a:summary', ns).text.strip() | |
# Extract links (PDF) if available | |
links = entry.findall('a:link', ns) | |
pdf_link = None | |
for link in links: | |
if link.get('type') == 'application/pdf': | |
pdf_link = link.get('href') | |
summary_short = summary[:300] + "..." | |
# Include PDF link and title | |
if pdf_link: | |
formatted = f"Title: {title}\nPDF: {pdf_link}\nSummary: {summary_short}" | |
else: | |
formatted = f"Title: {title}\n(No PDF link)\nSummary: {summary_short}" | |
results.append(formatted) | |
return results | |
return [] | |
def summarize_arxiv_results(results): | |
if not results: | |
return "No results found." | |
return "\n\n".join(results) | |
def concatenate_mp3(files, output_file): | |
with open(output_file, 'wb') as outfile: | |
for f in files: | |
with open(f, 'rb') as infile: | |
outfile.write(infile.read()) | |
st.title("ποΈ Voice Chat & ArXiv Search") | |
with st.sidebar: | |
st.session_state['user_name'] = st.selectbox("Current User:", USER_NAMES, index=0) | |
saved_files = list_saved_inputs() | |
st.write("π Saved Inputs:") | |
for fpath in saved_files: | |
user, ts, content = parse_md_file(fpath) | |
fname = os.path.basename(fpath) | |
st.write(f"- {fname} (User: {user})") | |
if st.button("ποΈ Clear All History"): | |
for fpath in saved_files: | |
os.remove(fpath) | |
st.session_state['viewing_prefix'] = None | |
st.success("All history cleared!") | |
st.experimental_rerun() | |
mycomponent = components.declare_component("mycomponent", path="mycomponent") | |
voice_val = mycomponent(my_input_value="Start speaking...") | |
tabs = st.tabs(["π€ Voice Chat", "πΎ History", "βοΈ Settings"]) | |
# ------------------ Voice Chat Tab ------------------------- | |
with tabs[0]: | |
st.subheader("π€ Voice Chat") | |
if voice_val: | |
voice_text = voice_val.strip() | |
input_changed = (voice_text != st.session_state.get('old_val')) | |
if input_changed and voice_text: | |
# 1. Save user input | |
save_input_as_md(st.session_state['user_name'], voice_text, prefix="input") | |
# 2. Perform ArXiv search automatically | |
with st.spinner("Searching ArXiv..."): | |
results = arxiv_search(voice_text) | |
summary = summarize_arxiv_results(results) | |
# Save as response | |
save_input_as_md(st.session_state['user_name'], summary, prefix="arxiv") | |
st.write(summary) | |
# 3. Convert summary to audio and auto-play | |
voice = USER_VOICES.get(st.session_state['user_name'], "en-US-AriaNeural") | |
audio_file = speak_with_edge_tts(summary, voice=voice) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
# 4. Update old_val to avoid repeated searches for same input | |
st.session_state['old_val'] = voice_text | |
# 5. Clear displayed text and re-run so next utterance starts fresh | |
st.experimental_rerun() | |
st.write("Speak a query to automatically run an ArXiv search and read results aloud.") | |
# ------------------ History Tab ------------------------- | |
with tabs[1]: | |
st.subheader("πΎ History") | |
files = list_saved_inputs() | |
conversation = [] | |
for fpath in files: | |
user, ts, content = parse_md_file(fpath) | |
conversation.append((user, ts, content, fpath)) | |
for i, (user, ts, content, fpath) in enumerate(reversed(conversation), start=1): | |
with st.expander(f"{ts} - {user}", expanded=False): | |
st.write(content) | |
if st.button(f"π Read Aloud {ts}-{user}", key=f"read_{i}_{fpath}"): | |
voice = USER_VOICES.get(user, "en-US-AriaNeural") | |
audio_file = speak_with_edge_tts(content, voice=voice) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
if st.button("π Read Entire Conversation"): | |
conversation_chrono = list(reversed(conversation)) | |
mp3_files = [] | |
for user, ts, content, fpath in conversation_chrono: | |
voice = USER_VOICES.get(user, "en-US-AriaNeural") | |
audio_file = speak_with_edge_tts(content, voice=voice) | |
if audio_file: | |
mp3_files.append(audio_file) | |
st.write(f"**{user} ({ts}):**") | |
play_and_download_audio(audio_file) | |
if mp3_files: | |
combined_file = f"full_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
concatenate_mp3(mp3_files, combined_file) | |
st.write("**Full Conversation Audio:**") | |
play_and_download_audio(combined_file) | |
# ------------------ Settings Tab ------------------------- | |
with tabs[2]: | |
st.subheader("βοΈ Settings") | |
st.write("Currently no additional settings. Use the sidebar to pick a user.") | |
if st.session_state.should_rerun: | |
st.session_state.should_rerun = False | |
st.rerun() | |