CodeCompetitionClaudeVsGPT / backup11.app.py
awacke1's picture
Rename app.py to backup11.app.py
6af69d9 verified
raw
history blame
9.34 kB
import streamlit as st
import os, glob, re, base64, asyncio, requests
from datetime import datetime
from collections import defaultdict
from urllib.parse import quote
from xml.etree import ElementTree as ET
import edge_tts
import streamlit.components.v1 as components
# -------------------- Configuration --------------------
USER_NAMES = [
"Aria", "Guy", "Sonia", "Tony", "Jenny", "Davis", "Libby", "Clara", "Liam", "Natasha", "William"
]
ENGLISH_VOICES = [
"en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural", "en-GB-TonyNeural",
"en-US-JennyNeural", "en-US-DavisNeural", "en-GB-LibbyNeural", "en-CA-ClaraNeural",
"en-CA-LiamNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural"
]
USER_VOICES = dict(zip(USER_NAMES, ENGLISH_VOICES))
SAVED_INPUTS_DIR = "saved_inputs"
os.makedirs(SAVED_INPUTS_DIR, exist_ok=True)
if 'user_name' not in st.session_state:
st.session_state['user_name'] = USER_NAMES[0]
if 'old_val' not in st.session_state:
st.session_state['old_val'] = None
if 'should_rerun' not in st.session_state:
st.session_state['should_rerun'] = False
if 'viewing_prefix' not in st.session_state:
st.session_state['viewing_prefix'] = None
def clean_for_speech(text: str) -> str:
text = text.replace("\n", " ")
text = text.replace("</s>", " ")
text = text.replace("#", "")
text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
async def edge_tts_generate_audio(text, voice="en-US-AriaNeural"):
text = clean_for_speech(text)
if not text.strip():
return None
communicate = edge_tts.Communicate(text, voice)
out_fn = f"speech_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
try:
await communicate.save(out_fn)
except edge_tts.exceptions.NoAudioReceived:
st.error("No audio received from TTS service.")
return None
return out_fn
def speak_with_edge_tts(text, voice="en-US-AriaNeural"):
return asyncio.run(edge_tts_generate_audio(text, voice))
def play_and_download_audio(file_path):
if file_path and os.path.exists(file_path):
st.audio(file_path)
dl_link = f'<a href="data:audio/mpeg;base64,{base64.b64encode(open(file_path,"rb").read()).decode()}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>'
st.markdown(dl_link, unsafe_allow_html=True)
def save_input_as_md(user_name, text, prefix="input"):
if not text.strip():
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_text = re.sub(r'[^\w\s-]', '', text[:50]).strip().lower()
safe_text = re.sub(r'[-\s]+', '-', safe_text)
fn = f"{prefix}_{timestamp}_{safe_text}.md"
full_path = os.path.join(SAVED_INPUTS_DIR, fn)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(f"# User: {user_name}\n")
f.write(f"**Timestamp:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(text)
return full_path
def list_saved_inputs():
files = sorted(glob.glob(os.path.join(SAVED_INPUTS_DIR, "*.md")))
return files
def parse_md_file(fpath):
user_line = ""
ts_line = ""
content_lines = []
with open(fpath, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if line.startswith("# User:"):
user_line = line.replace("# User:", "").strip()
elif line.startswith("**Timestamp:**"):
ts_line = line.replace("**Timestamp:**", "").strip()
else:
content_lines.append(line.strip())
content = "\n".join(content_lines).strip()
return user_line, ts_line, content
def arxiv_search(query, max_results=3):
base_url = "http://export.arxiv.org/api/query"
params = {
'search_query': query.replace(' ', '+'),
'start': 0,
'max_results': max_results
}
response = requests.get(base_url, params=params, timeout=30)
if response.status_code == 200:
root = ET.fromstring(response.text)
ns = {"a": "http://www.w3.org/2005/Atom"}
entries = root.findall('a:entry', ns)
results = []
for entry in entries:
title = entry.find('a:title', ns).text.strip()
summary = entry.find('a:summary', ns).text.strip()
# Extract links (PDF) if available
links = entry.findall('a:link', ns)
pdf_link = None
for link in links:
if link.get('type') == 'application/pdf':
pdf_link = link.get('href')
summary_short = summary[:300] + "..."
# Include PDF link and title
if pdf_link:
formatted = f"Title: {title}\nPDF: {pdf_link}\nSummary: {summary_short}"
else:
formatted = f"Title: {title}\n(No PDF link)\nSummary: {summary_short}"
results.append(formatted)
return results
return []
def summarize_arxiv_results(results):
if not results:
return "No results found."
return "\n\n".join(results)
def concatenate_mp3(files, output_file):
with open(output_file, 'wb') as outfile:
for f in files:
with open(f, 'rb') as infile:
outfile.write(infile.read())
st.title("πŸŽ™οΈ Voice Chat & ArXiv Search")
with st.sidebar:
st.session_state['user_name'] = st.selectbox("Current User:", USER_NAMES, index=0)
saved_files = list_saved_inputs()
st.write("πŸ“ Saved Inputs:")
for fpath in saved_files:
user, ts, content = parse_md_file(fpath)
fname = os.path.basename(fpath)
st.write(f"- {fname} (User: {user})")
if st.button("πŸ—‘οΈ Clear All History"):
for fpath in saved_files:
os.remove(fpath)
st.session_state['viewing_prefix'] = None
st.success("All history cleared!")
st.experimental_rerun()
mycomponent = components.declare_component("mycomponent", path="mycomponent")
voice_val = mycomponent(my_input_value="Start speaking...")
tabs = st.tabs(["🎀 Voice Chat", "πŸ’Ύ History", "βš™οΈ Settings"])
# ------------------ Voice Chat Tab -------------------------
with tabs[0]:
st.subheader("🎀 Voice Chat")
if voice_val:
voice_text = voice_val.strip()
input_changed = (voice_text != st.session_state.get('old_val'))
if input_changed and voice_text:
# 1. Save user input
save_input_as_md(st.session_state['user_name'], voice_text, prefix="input")
# 2. Perform ArXiv search automatically
with st.spinner("Searching ArXiv..."):
results = arxiv_search(voice_text)
summary = summarize_arxiv_results(results)
# Save as response
save_input_as_md(st.session_state['user_name'], summary, prefix="arxiv")
st.write(summary)
# 3. Convert summary to audio and auto-play
voice = USER_VOICES.get(st.session_state['user_name'], "en-US-AriaNeural")
audio_file = speak_with_edge_tts(summary, voice=voice)
if audio_file:
play_and_download_audio(audio_file)
# 4. Update old_val to avoid repeated searches for same input
st.session_state['old_val'] = voice_text
# 5. Clear displayed text and re-run so next utterance starts fresh
st.experimental_rerun()
st.write("Speak a query to automatically run an ArXiv search and read results aloud.")
# ------------------ History Tab -------------------------
with tabs[1]:
st.subheader("πŸ’Ύ History")
files = list_saved_inputs()
conversation = []
for fpath in files:
user, ts, content = parse_md_file(fpath)
conversation.append((user, ts, content, fpath))
for i, (user, ts, content, fpath) in enumerate(reversed(conversation), start=1):
with st.expander(f"{ts} - {user}", expanded=False):
st.write(content)
if st.button(f"πŸ”Š Read Aloud {ts}-{user}", key=f"read_{i}_{fpath}"):
voice = USER_VOICES.get(user, "en-US-AriaNeural")
audio_file = speak_with_edge_tts(content, voice=voice)
if audio_file:
play_and_download_audio(audio_file)
if st.button("πŸ“œ Read Entire Conversation"):
conversation_chrono = list(reversed(conversation))
mp3_files = []
for user, ts, content, fpath in conversation_chrono:
voice = USER_VOICES.get(user, "en-US-AriaNeural")
audio_file = speak_with_edge_tts(content, voice=voice)
if audio_file:
mp3_files.append(audio_file)
st.write(f"**{user} ({ts}):**")
play_and_download_audio(audio_file)
if mp3_files:
combined_file = f"full_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
concatenate_mp3(mp3_files, combined_file)
st.write("**Full Conversation Audio:**")
play_and_download_audio(combined_file)
# ------------------ Settings Tab -------------------------
with tabs[2]:
st.subheader("βš™οΈ Settings")
st.write("Currently no additional settings. Use the sidebar to pick a user.")
if st.session_state.should_rerun:
st.session_state.should_rerun = False
st.rerun()