AI_Calling / app.py
akashxagi's picture
Update app.py
c815bab verified
import os
import sys
import time
import uuid
import requests
import streamlit as st
import dotenv
from streamlit_card import card
from annotated_text import annotated_text
import json
# Add project root to path so models/services are importable when running from ui/
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
# Load environment variables
dotenv.load_dotenv(verbose=True)
# Default system prompt template from shared module.
# HF Space should include `shared/`; if not, fall back to embedded constant.
try:
from shared.prompt_templates import DEFAULT_SYSTEM_PROMPT_TEMPLATE
except ModuleNotFoundError:
DEFAULT_SYSTEM_PROMPT_TEMPLATE = """\
You are an AI calling agent working under a campaign.
Campaign Goal: {goal}
--- CAMPAIGN KNOWLEDGE BASE ---
{campaign_knowledge_base}
--- CONTACT ---
Name: {contact_name}
{personalized_context_block}
Use both knowledge sources to conduct a natural, goal-oriented conversation. Prioritize personalized context over general knowledge.
Speak like a human, be concise, and adapt based on user responses.
Do not hallucinate. If unsure, ask.
--- OUTPUT FORMAT ---
Respond with the spoken reply first (what the caller should hear), then on a new line emit a metadata block exactly like this and nothing after it:
<<<META>>>{{"intent": "...", "next_action": "..."}}<<<END>>>
Intent values: greeting | qualifying | objecting | booking | ending | other.
Next_action values: ask_question | confirm | book_demo | schedule_followup | hang_up | transfer | other.\
"""
# Page configuration with custom theme
st.set_page_config(
page_title="VoiceGenius AI Calling",
page_icon="📞",
layout="wide",
initial_sidebar_state="expanded"
)
# Apply custom CSS
st.markdown("""
<style>
.main {
background-color: #f5f7fa;
}
.block-container {
padding-top: 2rem;
padding-bottom: 2rem;
}
.stButton button {
border-radius: 20px;
font-weight: 500;
padding: 0.5rem 1.5rem;
transition: all 0.3s ease;
}
.stButton button:hover {
transform: translateY(-2px);
box-shadow: 0 4px 10px rgba(0,0,0,0.1);
}
.stTextInput input, .stTextArea textarea {
border-radius: 10px;
border: 1px solid #e0e0e0;
}
h1, h2, h3 {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.call-card {
background-color: white;
border-radius: 15px;
padding: 1.5rem;
box-shadow: 0 4px 15px rgba(0,0,0,0.05);
margin-bottom: 1rem;
}
.template-card {
cursor: pointer;
transition: all 0.2s ease;
}
.template-card:hover {
transform: translateY(-5px);
box-shadow: 0 8px 15px rgba(0,0,0,0.1);
}
.transcript-message {
padding: 10px 15px;
border-radius: 18px;
margin-bottom: 10px;
max-width: 80%;
line-height: 1.4;
}
.user-message {
background-color: #ed9121;
margin-left: auto;
border-bottom-right-radius: 5px;
}
.assistant-message {
background-color: #2e7ef7;
color: white;
margin-right: auto;
border-bottom-left-radius: 5px;
}
.status-indicator {
width: 12px;
height: 12px;
border-radius: 50%;
display: inline-block;
margin-right: 8px;
}
.status-active {
background-color: #4CAF50;
animation: pulse 1.5s infinite;
}
.status-inactive {
background-color: #9e9e9e;
}
@keyframes pulse {
0% {
opacity: 1;
}
50% {
opacity: 0.5;
}
100% {
opacity: 1;
}
}
.logo-text {
font-weight: 700;
background: linear-gradient(90deg, #2e7ef7, #1a56c5);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
}
.stat-card {
text-align: center;
padding: 1rem;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.05);
}
.prompt-card {
padding: 15px;
border-radius: 12px;
cursor: pointer;
transition: all 0.2s ease;
height: 100%;
}
.prompt-card:hover {
transform: translateY(-3px);
}
.prompt-medical {
background: linear-gradient(135deg, #ff9a9e, #fad0c4);
}
.prompt-finance {
background: linear-gradient(135deg, #84fab0, #8fd3f4);
}
.prompt-sports {
background: linear-gradient(135deg, #a1c4fd, #c2e9fb);
}
.prompt-custom {
background: linear-gradient(135deg, #e0c3fc, #8ec5fc);
}
</style>
""", unsafe_allow_html=True)
# Initialize session state variables
if 'call_active' not in st.session_state:
st.session_state.call_active = False
st.session_state.call_sid = None
st.session_state.transcript = []
st.session_state.system_message = os.getenv("SYSTEM_MESSAGE") or "You are a helpful AI assistant making a phone call. Be respectful, concise, and helpful."
st.session_state.initial_message = os.getenv("INITIAL_MESSAGE") or "Hello, this is an AI assistant calling. How can I help you today?"
st.session_state.all_transcripts = []
st.session_state.recording_info = None
st.session_state.call_selector = "Current Call"
st.session_state.total_calls = 0
st.session_state.successful_calls = 0
st.session_state.call_duration = 0
st.session_state.selected_template = None
st.session_state.should_reset_selector = False
# Campaign session state
if 'app_mode' not in st.session_state:
st.session_state.app_mode = "Single Call"
if 'selected_campaign_id' not in st.session_state:
st.session_state.selected_campaign_id = None
if 'campaigns_cache' not in st.session_state:
st.session_state.campaigns_cache = []
if 'voice_profile_id' not in st.session_state:
st.session_state.voice_profile_id = None
if 'voice_profiles_cache' not in st.session_state:
st.session_state.voice_profiles_cache = []
if 'start_call_request_id' not in st.session_state:
st.session_state.start_call_request_id = None
if 'start_call_in_flight' not in st.session_state:
st.session_state.start_call_in_flight = False
# Predefined prompt templates
prompt_templates = {
"medical": {
"system": {
"en": "You are a medical appointment scheduling assistant for Dr. Smith's office. You should collect patient information, reason for appointment, and preferred times. You cannot give medical advice.",
"es": "Eres un asistente de programación de citas médicas para la oficina del Dr. Smith. Debes recopilar información del paciente, motivo de la cita y horarios preferidos. No puedes dar consejos médicos."
},
"initial": {
"en": "Hello, this is the virtual assistant from Dr. Smith's office.",
"es": "Hola, soy el asistente virtual de la oficina del Dr. Smith"
}
},
"finance": {
"system": {
"en": "You are a financial services assistant for GrowWealth Advisors. You can discuss appointment scheduling and general services offered, but cannot give specific investment advice during this call.",
"es": "Eres un asistente de servicios financieros para GrowWealth Advisors. Puedes discutir la programación de citas y los servicios generales ofrecidos, pero no puedes dar consejos específicos de inversión durante esta llamada."
},
"initial": {
"en": "Hello, this is the virtual assistant from GrowWealth Financial Advisors.",
"es": "Hola, soy el asistente virtual de GrowWealth Financial Advisors."
}
},
"sports": {
"system": {
"en": "You are a membership coordinator for SportsFit Gym. You should provide information about membership options, facility hours, and classes offered. Be enthusiastic and encouraging.",
"es": "Eres un coordinador de membresías para el gimnasio SportsFit. Debes proporcionar información sobre las opciones de membresía, horarios de las instalaciones y clases ofrecidas. Sé entusiasta y alentador."
},
"initial": {
"en": "Hi there! This is the virtual assistant from SportsFit Gym.",
"es": "¡Hola! Soy el asistente virtual del gimnasio SportsFit."
}
},
"customer_service": {
"system": {
"en": "You are a customer service representative following up on a recent purchase. You should check satisfaction levels, address any concerns, and offer assistance if needed.",
"es": "Eres un representante de servicio al cliente dando seguimiento a una compra reciente. Debes verificar los niveles de satisfacción, atender cualquier inquietud y ofrecer asistencia si es necesario."
},
"initial": {
"en": "Hello, this is the customer service team from Acme Products.",
"es": "Hola, soy del equipo de servicio al cliente de Acme Products."
}
}
}
voice_options = {
"en": { # English voices
"Emma (Female)": "11labs_emma",
"Daniel (Male)": "11labs_daniel",
"Rachel (Female)": "11labs_rachel",
"John (Male)": "11labs_john"
},
"es": { # Spanish voices
"Sofia (Female)": "11labs_sofia",
"Miguel (Male)": "11labs_miguel",
"Isabella (Female)": "11labs_isabella",
"Carlos (Male)": "11labs_carlos",
"JuanRestrepoPro (Male)": "11labs_JuanRestrepoPro"
}
}
# Helper functions
def fetch_all_transcripts():
try:
response = requests.get(f"https://{os.getenv('SERVER')}/all_transcripts")
transcripts = response.json().get('transcripts', [])
st.session_state.total_calls = len(transcripts)
st.session_state.successful_calls = sum(1 for t in transcripts if any(m['role'] == 'user' for m in t.get('transcript', [])))
return transcripts
except requests.RequestException as e:
st.error(f"Error fetching call list: {str(e)}")
return []
def format_duration(seconds):
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
remaining_seconds = seconds % 60
return f"{minutes}m {remaining_seconds}s"
def apply_template(template_name):
if template_name in prompt_templates:
lang_code = language_options[selected_language]
st.session_state.system_message = prompt_templates[template_name]["system"].get(lang_code, prompt_templates[template_name]["system"]["en"])
st.session_state.initial_message = prompt_templates[template_name]["initial"].get(lang_code, prompt_templates[template_name]["initial"]["en"])
st.session_state.selected_template = template_name
return True
return False
def fetch_recording_info(call_sid):
try:
response = requests.get(f"https://{os.getenv('SERVER')}/call_recording/{call_sid}")
if media_url := response.json().get('recording_url'):
media_response = requests.get(media_url)
if media_response.status_code == 200:
media_data = media_response.json()
return {
'url': f"{media_data.get('media_url')}.mp3",
'duration': media_data.get('duration', 0)
}
except requests.RequestException as e:
st.error(f"Error fetching recording info: {str(e)}")
return None
def _extract_call_sid(selector_label):
"""Extract call_sid from selector label like '[AI] Call CA...' or 'Call CA...'."""
# Strip the [AI]/[MANUAL] prefix if present
if "] Call " in selector_label:
return selector_label.split("] Call ", 1)[1]
if selector_label.startswith("Call "):
return selector_label[5:]
return selector_label
def on_call_selector_change():
if st.session_state.call_selector != "Current Call":
sid = _extract_call_sid(st.session_state.call_selector)
selected_transcript = next((t for t in st.session_state.all_transcripts if t['call_sid'] == sid), None)
if selected_transcript:
st.session_state.recording_info = fetch_recording_info(selected_transcript['call_sid'])
else:
st.warning("No transcript found for the selected call.")
else:
st.session_state.recording_info = None
# Check if we need to reset the selector
if st.session_state.should_reset_selector:
st.session_state.call_selector = "Current Call"
st.session_state.should_reset_selector = False
# ---------------------------------------------------------------------------
# Campaign API helpers
# ---------------------------------------------------------------------------
_BASE = lambda: f"https://{os.getenv('SERVER')}"
def api_list_voice_profiles(usable_only=False):
try:
r = requests.get(f"{_BASE()}/voices", params={"usable_only": str(usable_only).lower()}, timeout=15)
return r.json().get("voices", [])
except Exception:
return []
def api_create_voice_clone(display_name, description, labels_json, consent_confirmed, files):
try:
multipart_files = []
for f in files:
multipart_files.append(("files", (f.name, f.getvalue(), f.type or "audio/mpeg")))
data = {
"display_name": display_name,
"description": description,
"labels": labels_json,
"consent_confirmed": str(bool(consent_confirmed)).lower(),
}
r = requests.post(f"{_BASE()}/voices/clones", data=data, files=multipart_files, timeout=90)
res = r.json()
if r.status_code >= 400:
return {"error": res.get("error", f"Server error ({r.status_code})")}
return res
except Exception as e:
return {"error": str(e)}
def api_register_voice(display_name, provider_voice_id, description, labels):
try:
r = requests.post(
f"{_BASE()}/voices/register",
json={
"display_name": display_name,
"provider_voice_id": provider_voice_id,
"description": description,
"labels": labels or {},
"consent_confirmed": True,
},
timeout=30,
)
res = r.json()
if r.status_code >= 400:
return {"error": res.get("error", f"Server error ({r.status_code})")}
return res
except Exception as e:
return {"error": str(e)}
def api_delete_voice_profile(voice_profile_id, delete_remote=False):
try:
r = requests.delete(
f"{_BASE()}/voices/{voice_profile_id}",
params={"delete_remote": str(bool(delete_remote)).lower()},
timeout=30,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_list_campaigns():
try:
r = requests.get(f"{_BASE()}/campaigns", timeout=10)
return r.json().get("campaigns", [])
except Exception:
return []
def api_get_campaign(campaign_id):
try:
r = requests.get(f"{_BASE()}/campaigns/{campaign_id}", timeout=10)
return r.json()
except Exception:
return {}
def api_create_campaign(data):
try:
r = requests.post(f"{_BASE()}/campaigns", json=data, timeout=10)
result = r.json()
if r.status_code >= 400:
return {"error": result.get("error", f"Server error ({r.status_code})")}
return result
except Exception as e:
return {"error": str(e)}
def api_update_campaign(campaign_id, data):
try:
r = requests.patch(f"{_BASE()}/campaigns/{campaign_id}", json=data, timeout=10)
result = r.json()
if r.status_code >= 400:
return {"error": result.get("error", f"Server error ({r.status_code})")}
return result
except Exception as e:
return {"error": str(e)}
def api_delete_campaign(campaign_id):
try:
r = requests.delete(f"{_BASE()}/campaigns/{campaign_id}", timeout=10)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_list_contacts(campaign_id, status=None):
try:
url = f"{_BASE()}/campaigns/{campaign_id}/contacts"
if status:
url += f"?status={status}"
r = requests.get(url, timeout=10)
return r.json().get("contacts", [])
except Exception:
return []
def api_upload_contacts(campaign_id, file_bytes, filename):
try:
r = requests.post(
f"{_BASE()}/campaigns/{campaign_id}/contacts/upload",
files={"file": (filename, file_bytes)},
timeout=30,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_call_contact(campaign_id, contact_id):
try:
r = requests.post(
f"{_BASE()}/campaigns/{campaign_id}/contacts/{contact_id}/call",
timeout=10,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_start_campaign(campaign_id, max_concurrent=1):
try:
r = requests.post(
f"{_BASE()}/campaigns/{campaign_id}/start",
json={"max_concurrent": max_concurrent},
timeout=10,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_stop_campaign(campaign_id):
try:
r = requests.post(f"{_BASE()}/campaigns/{campaign_id}/stop", timeout=10)
return r.json()
except Exception as e:
return {"error": str(e)}
def api_list_calls(campaign_id):
try:
r = requests.get(f"{_BASE()}/campaigns/{campaign_id}/calls", timeout=10)
return r.json().get("calls", [])
except Exception:
return []
def api_get_call_detail(campaign_id, call_id):
try:
r = requests.get(f"{_BASE()}/campaigns/{campaign_id}/calls/{call_id}", timeout=10)
return r.json()
except Exception:
return {}
def _sample_csv() -> bytes:
return b"name,phone,personalized_context,voice_profile_name\nJane Doe,+14155551234,Prefers morning calls. Interested in pro tier.,\nJohn Roe,+14085550000,,\n"
def _voice_status_badge(status: str) -> str:
status_map = {
"active": "active",
"pending_verification": "pending",
"failed": "failed",
"creating": "creating",
}
return status_map.get(status, status or "unknown")
def render_voice_library_panel():
st.subheader("Voice Library")
st.caption("Create or register ElevenLabs voices for single calls and campaigns.")
with st.expander("Create Clone (IVC)", expanded=False):
clone_name = st.text_input("Display Name", key="clone_name")
clone_desc = st.text_area("Description", key="clone_desc", height=80)
clone_labels_text = st.text_input("Labels JSON", value="{}", key="clone_labels_json")
clone_files = st.file_uploader(
"Upload sample audio files",
type=["mp3", "wav", "m4a", "flac", "ogg"],
accept_multiple_files=True,
key="clone_files_uploader",
)
consent = st.checkbox(
"I confirm I have consent/rights to clone and use this voice.",
key="clone_consent_checkbox",
)
if st.button("Create Clone Voice", key="create_clone_voice_btn", use_container_width=True):
if not clone_name.strip():
st.error("Display name is required.")
elif not clone_files:
st.error("Upload at least one sample file.")
elif not consent:
st.error("Consent confirmation is required.")
else:
result = api_create_voice_clone(
display_name=clone_name.strip(),
description=clone_desc.strip(),
labels_json=clone_labels_text.strip() or "{}",
consent_confirmed=consent,
files=clone_files,
)
if result.get("error"):
st.error(result["error"])
else:
st.success(f"Voice clone created: {result.get('display_name')}")
st.rerun()
with st.expander("Register Existing Voice ID", expanded=False):
reg_name = st.text_input("Display Name", key="register_voice_name")
reg_voice_id = st.text_input("ElevenLabs Voice ID", key="register_provider_voice_id")
reg_desc = st.text_area("Description", key="register_voice_desc", height=60)
reg_labels_text = st.text_input("Labels JSON", value="{}", key="register_labels_json")
if st.button("Register Voice", key="register_voice_btn", use_container_width=True):
try:
labels = json.loads(reg_labels_text.strip() or "{}")
if not isinstance(labels, dict):
raise ValueError("Labels must be a JSON object")
except Exception as exc:
st.error(f"Invalid labels JSON: {exc}")
labels = None
if labels is not None:
res = api_register_voice(reg_name.strip(), reg_voice_id.strip(), reg_desc.strip(), labels)
if res.get("error"):
st.error(res["error"])
else:
st.success(f"Voice registered: {res.get('display_name')}")
st.rerun()
voices = api_list_voice_profiles(usable_only=False)
if not voices:
st.info("No voice profiles yet.")
return []
with st.expander(f"Manage Voices ({len(voices)})", expanded=False):
for v in voices:
provider_voice = v.get("provider_voice_id") or "-"
status = _voice_status_badge(v.get("status", "unknown"))
st.markdown(
f"**{v.get('display_name','Unnamed')}** \n"
f"`{v.get('id')}` \n"
f"status: **{status}** \n"
f"provider_voice_id: `{provider_voice}`"
)
col1, col2 = st.columns(2)
delete_remote = col1.checkbox("Delete remote too", key=f"del_remote_{v['id']}")
if col2.button("Delete", key=f"del_voice_{v['id']}", use_container_width=True):
res = api_delete_voice_profile(v["id"], delete_remote=delete_remote)
if res.get("error"):
st.error(res["error"])
else:
st.success("Voice profile deleted.")
st.rerun()
st.divider()
return voices
# ---------------------------------------------------------------------------
# Campaign UI renderers
# ---------------------------------------------------------------------------
def render_campaigns_sidebar(campaigns):
st.subheader("Campaign")
campaign_names = ["+ New Campaign"] + [c["name"] for c in campaigns]
current_name = "+ New Campaign"
if st.session_state.selected_campaign_id:
match = next((c for c in campaigns if c["id"] == st.session_state.selected_campaign_id), None)
if match:
current_name = match["name"]
default_idx = campaign_names.index(current_name) if current_name in campaign_names else 0
chosen = st.selectbox("Select campaign", campaign_names, index=default_idx, key="campaign_selector_sb")
if chosen == "+ New Campaign":
st.session_state.selected_campaign_id = None
else:
c = next((x for x in campaigns if x["name"] == chosen), None)
if c:
st.session_state.selected_campaign_id = c["id"]
if st.session_state.selected_campaign_id:
c_detail = next((x for x in campaigns if x["id"] == st.session_state.selected_campaign_id), {})
counts = c_detail.get("contact_counts", {})
pending = counts.get("pending", 0)
done = counts.get("completed", 0)
failed = counts.get("failed", 0) + counts.get("no-answer", 0)
col1, col2, col3 = st.columns(3)
col1.metric("Pending", pending)
col2.metric("Done", done)
col3.metric("Failed", failed)
is_running = c_detail.get("is_running", False)
if is_running:
st.markdown("**Status:** Running")
if st.button("Stop Campaign", key="stop_camp_btn", use_container_width=True):
res = api_stop_campaign(st.session_state.selected_campaign_id)
st.success("Stop requested.") if not res.get("error") else st.error(res["error"])
st.rerun()
else:
if st.button("Start Campaign", key="start_camp_btn",
disabled=pending == 0, use_container_width=True):
res = api_start_campaign(st.session_state.selected_campaign_id)
if res.get("error"):
st.error(res["error"])
else:
st.success(f"Started — {res.get('count', 0)} contacts queued.")
st.rerun()
def render_campaigns_main(campaigns, voice_profiles):
cid = st.session_state.selected_campaign_id
campaign = next((c for c in campaigns if c["id"] == cid), None) if cid else None
usable_voice_profiles = [
v for v in voice_profiles if v.get("status") == "active" and v.get("provider_voice_id")
]
vp_label_to_id = {"None": None}
for v in usable_voice_profiles:
vp_label_to_id[f"{v['display_name']} ({v['id'][:6]})"] = v["id"]
vp_labels = list(vp_label_to_id.keys())
st.markdown("<h1 style='text-align:center;'>Campaign Manager</h1>", unsafe_allow_html=True)
if not cid:
# ---- Create new campaign form ----
st.subheader("Create New Campaign")
with st.form("new_campaign_form"):
name = st.text_input("Campaign Name *", placeholder="Q2 Demo Outreach")
goal = st.text_input("Campaign Goal *", placeholder="Book a product demo")
knowledge_base = st.text_area(
"Knowledge Base",
placeholder="Paste product info, FAQs, pricing, talking points…",
height=200,
)
col1, col2 = st.columns(2)
with col1:
lang = st.selectbox("Language", ["en", "es"])
model = st.selectbox("AI Model", ["openai", "anthropic"])
with col2:
voice_id = st.selectbox("Voice", list(voice_options.get(lang, voice_options["en"]).values()))
default_voice_profile_label = st.selectbox(
"Default Cloned Voice (optional)",
options=vp_labels,
index=0,
help="Campaign-level default cloned voice. Contact override can still replace this.",
)
initial_msg = st.text_input("Initial Message (use {name})",
value="Hi {name}, this is an AI assistant calling. How are you today?")
customize_prompt = st.checkbox("Customize system prompt template")
prompt_tmpl = st.text_area("System Prompt Template", value=DEFAULT_SYSTEM_PROMPT_TEMPLATE,
height=300) if customize_prompt else DEFAULT_SYSTEM_PROMPT_TEMPLATE
submitted = st.form_submit_button("Create Campaign")
if submitted:
if not name or not goal:
st.error("Campaign Name and Goal are required.")
else:
res = api_create_campaign({
"name": name,
"goal": goal,
"knowledge_base": knowledge_base,
"system_prompt_template": prompt_tmpl,
"initial_message_template": initial_msg,
"language": lang,
"model": model,
"voice_id": voice_id,
"default_voice_profile_id": vp_label_to_id[default_voice_profile_label],
})
if res.get("error"):
st.error(res["error"])
else:
st.session_state.selected_campaign_id = res["id"]
st.success(f"Campaign '{res['name']}' created!")
st.rerun()
return
# ---- Campaign detail tabs ----
tab_settings, tab_contacts, tab_calls = st.tabs(["Settings", "Contacts", "Call History"])
# ---- Settings tab ----
with tab_settings:
st.subheader(f"Campaign: {campaign['name']}")
with st.form("edit_campaign_form"):
name = st.text_input("Name", value=campaign["name"])
goal = st.text_input("Goal", value=campaign["goal"])
knowledge_base = st.text_area("Knowledge Base", value=campaign.get("knowledge_base", ""), height=250)
col1, col2 = st.columns(2)
with col1:
lang = st.selectbox("Language", ["en", "es"],
index=0 if campaign.get("language", "en") == "en" else 1)
model = st.selectbox("AI Model", ["openai", "anthropic"],
index=0 if campaign.get("model", "openai") == "openai" else 1)
with col2:
voice_map = voice_options.get(lang, voice_options["en"])
voice_vals = list(voice_map.values())
cur_voice = campaign.get("voice_id") or voice_vals[0]
voice_idx = voice_vals.index(cur_voice) if cur_voice in voice_vals else 0
voice_id = st.selectbox("Voice", voice_vals, index=voice_idx)
current_default_vp = campaign.get("default_voice_profile_id")
current_default_label = "None"
for label, pid in vp_label_to_id.items():
if pid == current_default_vp:
current_default_label = label
break
default_voice_profile_label = st.selectbox(
"Default Cloned Voice (optional)",
options=vp_labels,
index=vp_labels.index(current_default_label) if current_default_label in vp_labels else 0,
)
initial_msg = st.text_input("Initial Message (use {name})",
value=campaign.get("initial_message_template", "Hi {name}, how are you?"))
customize = st.checkbox("Edit system prompt template")
prompt_tmpl = st.text_area("System Prompt Template",
value=campaign.get("system_prompt_template", DEFAULT_SYSTEM_PROMPT_TEMPLATE),
height=300) if customize else campaign.get("system_prompt_template", DEFAULT_SYSTEM_PROMPT_TEMPLATE)
col_save, col_del = st.columns([3, 1])
save = col_save.form_submit_button("Save Changes")
delete = col_del.form_submit_button("Delete Campaign", type="secondary")
if save:
res = api_update_campaign(cid, {
"name": name, "goal": goal, "knowledge_base": knowledge_base,
"language": lang, "model": model, "voice_id": voice_id,
"default_voice_profile_id": vp_label_to_id[default_voice_profile_label],
"initial_message_template": initial_msg,
"system_prompt_template": prompt_tmpl,
})
st.success("Saved.") if not res.get("error") else st.error(res["error"])
st.rerun()
if delete:
api_delete_campaign(cid)
st.session_state.selected_campaign_id = None
st.success("Campaign deleted.")
st.rerun()
# ---- Contacts tab ----
with tab_contacts:
st.subheader("Upload Contacts")
col_dl, col_up = st.columns([1, 3])
with col_dl:
st.download_button(
"Download Sample CSV",
data=_sample_csv(),
file_name="contacts_template.csv",
mime="text/csv",
)
with col_up:
uploaded = st.file_uploader("CSV or Excel file", type=["csv", "xlsx", "xls"], key="contact_uploader")
if uploaded:
res = api_upload_contacts(cid, uploaded.read(), uploaded.name)
if res.get("error"):
st.error(res["error"])
else:
st.success(f"Inserted: {res['inserted']} | Updated: {res['updated']} | Skipped: {res['skipped']}")
if res.get("errors"):
with st.expander(f"Upload errors ({len(res['errors'])})"):
for e in res["errors"]:
st.warning(f"Row {e['row']}, {e['field']}: {e['message']}")
st.rerun()
st.divider()
st.subheader("Add Single Contact")
with st.form("add_contact_form", clear_on_submit=True):
c1, c2 = st.columns(2)
c_name = c1.text_input("Name *")
c_phone = c2.text_input("Phone *", placeholder="+14155551234")
c_ctx = st.text_area("Personalized Context (optional)", height=80)
c_voice_profile_label = st.selectbox(
"Per-contact Cloned Voice Override",
options=vp_labels,
index=0,
help="Optional. Overrides campaign default cloned voice for this contact.",
)
add_submitted = st.form_submit_button("Add Contact")
if add_submitted:
if not c_name or not c_phone:
st.error("Name and phone are required.")
else:
res = requests.post(f"{_BASE()}/campaigns/{cid}/contacts",
json={
"name": c_name,
"phone": c_phone,
"personalized_context": c_ctx or None,
"voice_profile_id": vp_label_to_id[c_voice_profile_label],
},
timeout=10).json()
st.success(f"Added {res.get('name', c_name)}.") if not res.get("error") else st.error(res["error"])
st.rerun()
st.divider()
st.subheader("Contact List")
contacts = api_list_contacts(cid)
if not contacts:
st.info("No contacts yet. Upload a CSV or add one above.")
else:
import pandas as pd
df = pd.DataFrame([{
"Name": c["name"], "Phone": c["phone"],
"Status": c["status"],
"Context": (c.get("personalized_context") or "")[:60],
"id": c["id"],
} for c in contacts])
# Status color badges
status_colors = {
"pending": "gray", "queued": "blue", "in_progress": "orange",
"completed": "green", "failed": "red", "no-answer": "orange",
}
for _, row in df.iterrows():
color = status_colors.get(row["Status"], "gray")
contact = next((c for c in contacts if c["id"] == row["id"]), None)
contact_voice_profile_id = (contact or {}).get("voice_profile_id")
contact_voice_label = "None"
for label, pid in vp_label_to_id.items():
if pid == contact_voice_profile_id:
contact_voice_label = label
break
col_info, col_voice, col_btn = st.columns([4, 2, 1])
with col_info:
st.markdown(
f"**{row['Name']}** &nbsp; `{row['Phone']}` &nbsp; "
f"<span style='color:{color}'>●</span> {row['Status']}"
+ (f"<br><small>{row['Context']}</small>" if row["Context"] else ""),
unsafe_allow_html=True,
)
with col_voice:
selected_contact_voice = st.selectbox(
"Voice override",
options=vp_labels,
index=vp_labels.index(contact_voice_label) if contact_voice_label in vp_labels else 0,
key=f"contact_voice_profile_{row['id']}",
label_visibility="collapsed",
)
if st.button("Save Voice", key=f"save_contact_voice_{row['id']}", use_container_width=True):
save_res = requests.patch(
f"{_BASE()}/campaigns/{cid}/contacts/{row['id']}",
json={"voice_profile_id": vp_label_to_id[selected_contact_voice]},
timeout=10,
).json()
if save_res.get("error"):
st.error(save_res["error"])
else:
st.success("Voice override updated.")
st.rerun()
with col_btn:
if st.button("Call", key=f"call_{row['id']}"):
res = api_call_contact(cid, row["id"])
if res.get("error"):
st.error(res["error"])
else:
st.success(f"Call started: {res.get('call_sid', '')[:12]}…")
st.rerun()
# ---- Call History tab ----
with tab_calls:
st.subheader("Call History")
if st.button("Refresh", key="refresh_calls"):
st.rerun()
calls = api_list_calls(cid)
if not calls:
st.info("No calls yet.")
else:
for call in calls:
status = call.get("final_status") or "in_progress"
dot = {"completed": "🟢", "failed": "🔴", "no-answer": "🟠"}.get(status, "🔵")
header = (
f"{call['contact_name']} {call['contact_phone']} — "
f"{dot} {status} | {(call.get('started_at') or '')[:16]}"
)
with st.expander(header, expanded=False):
detail = api_get_call_detail(cid, call["id"])
transcript = detail.get("transcript", [])
meta_events = detail.get("meta_events", [])
if transcript:
st.markdown("**Transcript**")
for entry in transcript:
if entry["role"] == "user":
st.markdown(f"""
<div class="transcript-message user-message">
<strong>Caller:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
elif entry["role"] == "assistant":
st.markdown(f"""
<div class="transcript-message assistant-message">
<strong>AI:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
else:
st.info("Transcript not yet available.")
if meta_events:
st.markdown("**Conversation Analytics (intent / next_action)**")
import pandas as pd
st.dataframe(pd.DataFrame(meta_events), use_container_width=True)
# Sidebar content
with st.sidebar:
st.markdown(f"""
<div style="text-align: center; margin-bottom: 20px;">
<h1 class="logo-text" style="font-size: 2.2em;">VoiceGenius</h1>
<p style="opacity: 0.7; margin-top: -10px;">AI-Powered Phone Calls</p>
</div>
""", unsafe_allow_html=True)
st.divider()
# Call stats display
col1, col2 = st.columns(2)
with col1:
st.markdown("""
<div class="stat-card">
<h3 style="margin: 0; font-size: 1.8rem; color: #2e7ef7;">📊</h3>
<h3 style="margin: 0;">Total Calls</h3>
<p style="font-size: 1.5rem; font-weight: bold;">{}</p>
</div>
""".format(st.session_state.total_calls), unsafe_allow_html=True)
with col2:
st.markdown("""
<div class="stat-card">
<h3 style="margin: 0; font-size: 1.8rem; color: #4CAF50;">✅</h3>
<h3 style="margin: 0;">Completed</h3>
<p style="font-size: 1.5rem; font-weight: bold;">{}</p>
</div>
""".format(st.session_state.successful_calls), unsafe_allow_html=True)
st.divider()
# Top-level mode selector
app_mode = st.radio(
"Mode",
options=["Single Call", "Campaigns"],
index=0 if st.session_state.app_mode == "Single Call" else 1,
horizontal=True,
key="app_mode_radio",
)
st.session_state.app_mode = app_mode
st.divider()
st.session_state.voice_profiles_cache = render_voice_library_panel()
st.divider()
# Default values (overridden in Single Call branch)
call_mode = "AI Call"
phone_number = ""
operator_number = ""
start_call = False
end_call = False
if app_mode == "Single Call":
# Call mode selection
call_mode = st.radio(
"Call Mode",
options=["AI Call", "Manual Call"],
index=0,
help="AI Call: AI agent handles the conversation. Manual Call: You speak directly via your phone.",
horizontal=True
)
st.divider()
# Phone number input (target to call)
phone_number = st.text_input(
"Target Phone Number",
placeholder="+1XXXXXXXXXX",
value=os.getenv("YOUR_NUMBER") or "",
help="Enter the phone number to call in international format"
)
# Operator phone number (only for manual call mode)
if call_mode == "Manual Call":
operator_number = st.text_input(
"Your Phone Number (Operator)",
placeholder="+1XXXXXXXXXX",
value=os.getenv("OPERATOR_NUMBER") or "",
help="Your phone number — Twilio will call you first, then connect you to the target"
)
else:
operator_number = ""
if app_mode == "Single Call":
# Language selection
language_options = {
"English": "en",
"Spanish": "es"
}
selected_language = st.selectbox(
"Call Language",
options=list(language_options.keys()),
index=0,
help="Select the language for the conversation"
)
language_code = language_options[selected_language]
selected_lang_code = language_options[selected_language]
# AI-only settings (hidden in Manual Call mode)
if call_mode == "AI Call":
model_options = {
"OpenAI GPT-4o": "openai",
"Anthropic Claude": "anthropic"
}
selected_model = st.selectbox(
"AI Model",
options=list(model_options.keys()),
index=0,
help="Select the AI language model to use"
)
model_code = model_options[selected_model]
if 'model_selection' not in st.session_state:
st.session_state.model_selection = model_code
else:
st.session_state.model_selection = model_code
voice_source = st.radio(
"Voice Source",
options=["Preset Voice", "Voice Library"],
index=0,
horizontal=True,
help="Preset Voice uses built-in mapped voices. Voice Library uses your ElevenLabs clones."
)
if voice_source == "Preset Voice":
available_voices = voice_options.get(selected_lang_code, voice_options["en"])
selected_voice_name = st.selectbox(
"Voice",
options=list(available_voices.keys()),
index=0,
help="Select the voice for the AI assistant"
)
selected_voice_id = available_voices[selected_voice_name]
st.session_state.voice_id = selected_voice_id
st.session_state.voice_profile_id = None
else:
usable_profiles = api_list_voice_profiles(usable_only=True)
if not usable_profiles:
st.warning("No active voice profiles found. Create one in Voice Library.")
available_voices = voice_options.get(selected_lang_code, voice_options["en"])
selected_voice_name = st.selectbox(
"Fallback Preset Voice",
options=list(available_voices.keys()),
index=0,
)
selected_voice_id = available_voices[selected_voice_name]
st.session_state.voice_id = selected_voice_id
st.session_state.voice_profile_id = None
else:
voice_profile_map = {
f"{v.get('display_name')} ({v.get('id')[:6]})": v.get("id")
for v in usable_profiles
}
voice_profile_labels = list(voice_profile_map.keys())
selected_voice_profile_label = st.selectbox(
"Cloned Voice",
options=voice_profile_labels,
index=0,
help="Use a voice from your voice library for this call.",
)
st.session_state.voice_profile_id = voice_profile_map[selected_voice_profile_label]
st.session_state.voice_id = None
st.divider()
with st.sidebar.expander("Voice Settings ", expanded=False):
st.markdown("### Customize Voice Parameters")
if 'voice_settings' not in st.session_state:
st.session_state.voice_settings = {
"stability": 0.5,
"similarity_boost": 0.75,
"style": 0.0,
"use_speaker_boost": True,
"speed": 1.0
}
st.session_state.voice_settings["stability"] = st.slider(
"Stability", min_value=0.0, max_value=1.0,
value=st.session_state.voice_settings.get("stability", 0.5), step=0.05,
help="Higher values make the voice more consistent between re-generations but can reduce expressiveness."
)
st.session_state.voice_settings["similarity_boost"] = st.slider(
"Similarity Boost", min_value=0.0, max_value=1.0,
value=st.session_state.voice_settings.get("similarity_boost", 0.75), step=0.05,
help="Higher values make the voice more similar to the original voice but can reduce quality."
)
st.session_state.voice_settings["style"] = st.slider(
"Style", min_value=0.0, max_value=1.0,
value=st.session_state.voice_settings.get("style", 0.0), step=0.05,
help="Higher values amplify unique speaking style of the cloned voice."
)
st.session_state.voice_settings["speed"] = st.slider(
"Speed", min_value=0.7, max_value=1.2,
value=st.session_state.voice_settings.get("speed", 1.0), step=0.01,
help="Adjust the speaking speed of the voice."
)
st.session_state.voice_settings["use_speaker_boost"] = st.checkbox(
"Speaker Boost",
value=st.session_state.voice_settings.get("use_speaker_boost", True),
help="Improves voice clarity and target speaker similarity."
)
if st.button("Reset to Defaults"):
st.session_state.voice_settings = {
"stability": 0.5, "similarity_boost": 0.75,
"style": 0.0, "use_speaker_boost": True, "speed": 1.0
}
st.rerun()
st.divider()
# Status indicator
status_class = "status-active" if st.session_state.call_active else "status-inactive"
status_text = "Call in progress" if st.session_state.call_active else "Ready to call"
st.markdown(f"""
<div style="display: flex; align-items: center; margin-bottom: 15px;">
<span class="status-indicator {status_class}"></span>
<span>{status_text}</span>
</div>
""", unsafe_allow_html=True)
# Call controls
call_col1, call_col2 = st.columns(2)
with call_col1:
start_call = st.button(
"📞 Start Call",
disabled=st.session_state.call_active or st.session_state.start_call_in_flight,
use_container_width=True
)
with call_col2:
end_call = st.button("🔴 End Call", disabled=not st.session_state.call_active, use_container_width=True)
st.divider()
# Call history
st.subheader("Call History")
st.session_state.all_transcripts = fetch_all_transcripts()
call_history_options = ["Current Call"]
for t in st.session_state.all_transcripts:
ctype = t.get('call_type', 'ai').upper()
call_history_options.append(f"[{ctype}] Call {t['call_sid']}")
st.selectbox(
"Select a call",
options=call_history_options,
key="call_selector",
index=0,
disabled=st.session_state.call_active,
on_change=on_call_selector_change
)
if st.button("🔄 Refresh Calls", use_container_width=True):
try:
st.session_state.all_transcripts = fetch_all_transcripts()
on_call_selector_change()
except requests.RequestException as e:
st.error(f"Error fetching call list: {str(e)}")
else: # Campaigns mode sidebar
st.session_state.campaigns_cache = api_list_campaigns()
render_campaigns_sidebar(st.session_state.campaigns_cache)
# Main content area — campaigns mode gets its own full page
if st.session_state.app_mode == "Campaigns":
render_campaigns_main(st.session_state.campaigns_cache, st.session_state.voice_profiles_cache)
st.stop()
st.markdown("<h1 style='text-align: center;'>AI Voice Calling Assistant</h1>", unsafe_allow_html=True)
if call_mode == "AI Call":
# Prompt template selection (AI mode only)
st.subheader("Select Prompt Template")
prompt_cols = st.columns(4)
with prompt_cols[0]:
medical_card = st.markdown("""
<div class="prompt-card prompt-medical">
<h3>🩺 Medical Office</h3>
<p>Schedule appointments and collect patient information</p>
</div>
""", unsafe_allow_html=True)
if medical_card:
if st.button("Select Medical", key="medical_btn", use_container_width=True):
apply_template("medical")
with prompt_cols[1]:
finance_card = st.markdown("""
<div class="prompt-card prompt-finance">
<h3>💹 Financial Services</h3>
<p>Schedule consultations and discuss available services</p>
</div>
""", unsafe_allow_html=True)
if finance_card:
if st.button("Select Finance", key="finance_btn", use_container_width=True):
apply_template("finance")
with prompt_cols[2]:
sports_card = st.markdown("""
<div class="prompt-card prompt-sports">
<h3>🏋️ Sports & Fitness</h3>
<p>Discuss gym memberships and class schedules</p>
</div>
""", unsafe_allow_html=True)
if sports_card:
if st.button("Select Sports", key="sports_btn", use_container_width=True):
apply_template("sports")
with prompt_cols[3]:
custom_card = st.markdown("""
<div class="prompt-card prompt-custom">
<h3>✨ Customer Service</h3>
<p>Follow up on purchases and customer satisfaction</p>
</div>
""", unsafe_allow_html=True)
if custom_card:
if st.button("Select Customer Service", key="custom_btn", use_container_width=True):
apply_template("customer_service")
st.divider()
# Custom prompt inputs in an expandable section
with st.expander("Customize AI Instructions", expanded=st.session_state.selected_template is None):
st.session_state.system_message = st.text_area(
"System Instructions (AI's role and guidelines)",
value=st.session_state.system_message,
disabled=st.session_state.call_active,
height=100
)
st.session_state.initial_message = st.text_area(
"Initial Message (First thing the AI will say)",
value=st.session_state.initial_message,
disabled=st.session_state.call_active,
height=100
)
st.divider()
else:
# Manual Call mode info
st.info("**Manual Call Mode** — Twilio will call your phone first. When you pick up, it connects you to the target number. You talk directly — no AI involved.")
st.divider()
# Handle call actions
if start_call and phone_number:
if call_mode == "Manual Call":
# Manual Call: bridge operator phone to target number
if not operator_number:
st.warning("Please enter your phone number (Operator) for Manual Call mode.")
else:
if not st.session_state.start_call_request_id:
st.session_state.start_call_request_id = uuid.uuid4().hex
st.session_state.start_call_in_flight = True
with st.spinner(f"📞 Calling your phone {operator_number}, then connecting to {phone_number}..."):
try:
response = requests.post(f"https://{os.getenv('SERVER')}/start_manual_call", json={
"to_number": phone_number,
"operator_number": operator_number,
}, timeout=10)
call_data = response.json()
if call_sid := call_data.get('call_sid'):
st.session_state.call_sid = call_sid
st.session_state.transcript = []
progress_bar = st.progress(0)
connection_status = st.empty()
for i in range(60):
progress_value = min(i / 30, 1.0)
progress_bar.progress(progress_value)
connection_status.info(f"Ringing your phone... ({i+1}s)")
time.sleep(1)
status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{call_sid}").json().get('status')
if status == 'in-progress':
progress_bar.progress(1.0)
connection_status.success("Connected! You are now talking to the target number.")
st.session_state.call_active = True
st.session_state.start_call_in_flight = False
st.session_state.should_reset_selector = True
time.sleep(1)
st.rerun()
break
if status in ['completed', 'failed', 'busy', 'no-answer']:
progress_bar.empty()
connection_status.error(f"Call ended: {status}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
break
else:
progress_bar.empty()
connection_status.error("Timeout waiting for call to connect.")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
else:
st.error(f"Failed to initiate manual call: {call_data}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
except requests.RequestException as e:
st.error(f"Error: {str(e)}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
else:
# AI Call: existing behavior
if not st.session_state.start_call_request_id:
st.session_state.start_call_request_id = uuid.uuid4().hex
st.session_state.start_call_in_flight = True
with st.spinner(f"📞 Calling {phone_number}..."):
try:
start_call_payload = {
"to_number": phone_number,
"system_message": st.session_state.system_message,
"initial_message": st.session_state.initial_message,
"language": language_code,
"model": st.session_state.model_selection,
"voice_settings": json.dumps(st.session_state.voice_settings),
"request_id": st.session_state.start_call_request_id,
}
if st.session_state.voice_id:
start_call_payload["voice_id"] = st.session_state.voice_id
if st.session_state.voice_profile_id:
start_call_payload["voice_profile_id"] = st.session_state.voice_profile_id
response = requests.post(
f"https://{os.getenv('SERVER')}/start_call",
json=start_call_payload,
timeout=10
)
call_data = response.json()
if call_sid := call_data.get('call_sid'):
st.session_state.call_sid = call_sid
st.session_state.transcript = []
progress_bar = st.progress(0)
connection_status = st.empty()
for i in range(60):
progress_value = min(i / 30, 1.0)
progress_bar.progress(progress_value)
connection_status.info(f"Establishing connection... ({i+1}s)")
time.sleep(1)
status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{call_sid}").json().get('status')
if status == 'in-progress':
progress_bar.progress(1.0)
connection_status.success("Call connected!")
st.session_state.call_active = True
st.session_state.start_call_in_flight = False
st.session_state.should_reset_selector = True
time.sleep(1)
st.rerun()
break
if status in ['completed', 'failed', 'busy', 'no-answer']:
progress_bar.empty()
connection_status.error(f"Call ended: {status}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
break
else:
progress_bar.empty()
connection_status.error("Timeout waiting for call to connect.")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
else:
st.error(f"Failed to initiate call: {call_data}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
except requests.RequestException as e:
st.error(f"Error: {str(e)}")
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
elif start_call:
st.warning("Please enter a valid phone number.")
if end_call:
try:
with st.spinner("Ending call..."):
response = requests.post(f"https://{os.getenv('SERVER')}/end_call", json={"call_sid": st.session_state.call_sid})
if response.status_code == 200:
st.success("Call ended successfully.")
st.session_state.call_active = False
st.session_state.call_sid = None
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
time.sleep(1)
st.rerun()
else:
st.error(f"Failed to end call: {response.text}")
except requests.RequestException as e:
st.error(f"Error ending call: {str(e)}")
# Display call transcript or recording
st.subheader("Call Transcript")
# Call Recording display
if st.session_state.call_selector != "Current Call" and st.session_state.recording_info:
st.markdown(f"""
<div class="call-card">
<h3>📞 Call Recording</h3>
<p>Duration: {format_duration(st.session_state.recording_info['duration'])}</p>
</div>
""", unsafe_allow_html=True)
st.audio(st.session_state.recording_info['url'], format="audio/mp3", start_time=0)
# Display transcript in chat-like format
transcript_container = st.container()
with transcript_container:
if st.session_state.call_active and st.session_state.call_sid:
for entry in st.session_state.transcript:
if entry['role'] == 'user':
st.markdown(f"""
<div class="transcript-message user-message">
<strong>Caller:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
elif entry['role'] == 'assistant':
st.markdown(f"""
<div class="transcript-message assistant-message">
<strong>AI:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
elif st.session_state.call_selector != "Current Call":
_sid = _extract_call_sid(st.session_state.call_selector)
if transcript := next((t for t in st.session_state.all_transcripts if t['call_sid'] == _sid), None):
for entry in transcript['transcript']:
if entry['role'] == 'user':
st.markdown(f"""
<div class="transcript-message user-message">
<strong>Caller:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
elif entry['role'] == 'assistant':
st.markdown(f"""
<div class="transcript-message assistant-message">
<strong>AI:</strong> {entry['content']}
</div>
""", unsafe_allow_html=True)
else:
st.info("No call transcript available. Start a call or select a previous call from the sidebar.")
# Live call updates
if st.session_state.call_active:
def update_call_info():
try:
status = requests.get(f"https://{os.getenv('SERVER')}/call_status/{st.session_state.call_sid}").json().get('status')
if status not in ['in-progress', 'ringing']:
st.session_state.call_active = False
st.warning(f"Call ended: {status}")
return False
transcript_data = requests.get(f"https://{os.getenv('SERVER')}/transcript/{st.session_state.call_sid}").json()
if transcript_data.get('call_ended', False):
st.session_state.call_active = False
st.info(f"Call ended. Status: {transcript_data.get('final_status', 'Unknown')}")
return False
st.session_state.transcript = transcript_data.get('transcript', [])
return True
except requests.RequestException as e:
st.error(f"Error updating call info: {str(e)}")
return False
if update_call_info():
time.sleep(1)
st.rerun()
else:
st.session_state.call_active = False
st.session_state.call_sid = None
st.session_state.start_call_in_flight = False
st.session_state.start_call_request_id = None
st.info("Call has ended. You can start a new call if needed.")
time.sleep(1)
st.rerun()
# Footer
st.markdown("""
<div style="text-align: center; margin-top: 50px; opacity: 0.7;">
<p>VoiceGenius AI Calling Platform • v2.0.3</p>
</div>
""", unsafe_allow_html=True)