Spaces:
Sleeping
Sleeping
| # app/gradio_app.py | |
| from __future__ import annotations | |
| from typing import List, Dict, Any, Tuple | |
| import os, time, shutil, uuid | |
| import gradio as gr | |
| try: | |
| from models.asr_whisper import get_asr | |
| except Exception: | |
| get_asr = None | |
| try: | |
| from models.llm_chat import respond_chat as llm_respond_chat | |
| except Exception: | |
| llm_respond_chat = None | |
| import shutil, uuid, os | |
| from models.tts_router import ( | |
| tts_synthesize, | |
| ensure_runtime_audio_dir, | |
| cleanup_old_audio, | |
| ) | |
| # ============================================================================= | |
| # Helpers (pure, modular) | |
| # ============================================================================= | |
| def _safe_llm_reply(history: List[Dict[str, str]], user_text: str) -> str: | |
| """ | |
| Try local LLM. If it's missing or errors, log loudly and return a safe fallback. | |
| """ | |
| if llm_respond_chat is None: | |
| print("[LLM] respond_chat not imported; using fallback.") | |
| return "Hello! How can I assist you today? Would you like to place an order or inquire about the menu?" | |
| try: | |
| bot_text, _guard, _diag = llm_respond_chat(history or [], user_text, {}) | |
| if isinstance(bot_text, str) and bot_text.strip(): | |
| print("[LLM] returned:", bot_text[:120].replace("\n"," ")) | |
| return bot_text.strip() | |
| else: | |
| print("[LLM] empty/invalid response; using fallback.") | |
| except Exception as e: | |
| import traceback | |
| print("[LLM] error -> fallback:", repr(e)) | |
| traceback.print_exc() | |
| return "Hello! How can I assist you today? Would you like to place an order or inquire about the menu?" | |
| def _persist_copy(src_path: str) -> str | None: | |
| """Copy mic recording into runtime/audio with a stable filename, returns the new path.""" | |
| if not (src_path and os.path.exists(src_path)): | |
| return None | |
| audio_dir = ensure_runtime_audio_dir() # <-- ask tts_router for the proper dir | |
| dst = os.path.join(audio_dir, f"user_{uuid.uuid4().hex}.wav") | |
| shutil.copyfile(src_path, dst) | |
| return dst | |
| def _asr_transcribe(aud_path: str) -> str: | |
| """ | |
| Transcribe audio to text. If ASR is unavailable, return a safe message. | |
| """ | |
| if not aud_path: | |
| return "(no audio)" | |
| if get_asr is None: | |
| return "(ASR unavailable)" | |
| try: | |
| asr = get_asr() | |
| out = asr.transcribe(aud_path) | |
| return (out.get("text") or "").strip() or "(no speech detected)" | |
| except Exception as e: | |
| print("[ASR] error:", e) | |
| return "(transcription failed)" | |
| def _tts_from_text(text: str) -> str | None: | |
| """ | |
| Synthesize assistant text to a WAV in runtime/audio. | |
| Returns a file path or None. | |
| """ | |
| if not (text and text.strip()): | |
| return None | |
| path = tts_synthesize(text.strip()) | |
| if path and os.path.exists(path): | |
| return path | |
| # always attempt one more minimal fallback to avoid empty path | |
| return tts_synthesize("How can I help with FutureCafe?") | |
| def _append_chat(history: List[Dict[str, str]] | None, | |
| role: str, content: str) -> List[Dict[str, str]]: | |
| hist = list(history or []) | |
| hist.append({"role": role, "content": content}) | |
| return hist | |
| def _startup_clean_runtime_audio(): | |
| """ | |
| On app start, clean previous session audio artifacts. | |
| """ | |
| audio_dir = ensure_runtime_audio_dir() | |
| try: | |
| for name in os.listdir(audio_dir): | |
| p = os.path.join(audio_dir, name) | |
| if os.path.isfile(p): | |
| os.remove(p) | |
| except Exception as e: | |
| print("[RUNTIME] Cannot clean runtime/audio:", e) | |
| # ============================================================================= | |
| # Voice handlers (modular) | |
| # ============================================================================= | |
| def handle_voice_turn( | |
| user_audio_path: str, | |
| voice_history: List[Dict[str, str]] | None | |
| ) -> Tuple[List[Dict[str, str]], str | None, Dict[str, Any]]: | |
| """ | |
| Single voice turn: | |
| 1) Transcribe user audio | |
| 2) Ask LLM for a reply (text) | |
| 3) TTS the reply to a WAV | |
| 4) Append both transcript and assistant text to the voice chat history | |
| Returns: (new_voice_history, assistant_audio_path, diag_json) | |
| """ | |
| t0 = time.time() | |
| transcript = _asr_transcribe(user_audio_path) | |
| hist1 = _append_chat(voice_history, "user", transcript) | |
| bot_text = _safe_llm_reply(hist1, transcript) | |
| hist2 = _append_chat(hist1, "assistant", bot_text) | |
| tts_path = _tts_from_text(bot_text) | |
| diag = { | |
| "intent": None, | |
| "slots": {}, | |
| "tool_selected": None, | |
| "tool_result": { | |
| "transcript": transcript, | |
| "llm_response": bot_text | |
| }, | |
| "latency_ms": int((time.time() - t0) * 1000), | |
| } | |
| return hist2, tts_path, diag | |
| # ============================================================================= | |
| # Text handlers (modular) | |
| # ============================================================================= | |
| def handle_text_turn( | |
| user_text: str, | |
| chat_history: List[Dict[str, str]] | None | |
| ) -> Tuple[List[Dict[str, str]], Dict[str, Any], str]: | |
| """ | |
| Single text turn: | |
| 1) Append user text | |
| 2) Ask LLM for a reply | |
| 3) Append assistant text | |
| 4) Prepare diagnostics | |
| Returns: (new_chat_history, diag_json, clear_text_value) | |
| """ | |
| t0 = time.time() | |
| user_text = (user_text or "").strip() | |
| if not user_text: | |
| return (chat_history or []), {"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0}, "" | |
| hist1 = _append_chat(chat_history, "user", user_text) | |
| bot_text = _safe_llm_reply(hist1, user_text) | |
| hist2 = _append_chat(hist1, "assistant", bot_text) | |
| diag = { | |
| "intent": None, | |
| "slots": {}, | |
| "tool_selected": None, | |
| "tool_result": {"user": user_text, "llm_response": bot_text}, | |
| "latency_ms": int((time.time() - t0) * 1000), | |
| } | |
| return hist2, diag, "" | |
| # ============================================================================= | |
| # Fixed UI (as requested) + wiring | |
| # ============================================================================= | |
| def build_demo(): | |
| """ | |
| Fixed UI layout: | |
| LEFT (Voice Call): | |
| - voice_in (mic recorder) | |
| - assistant_audio (autoplay) | |
| - voice_chat (transcript chat) | |
| - call_diag (JSON) | |
| RIGHT (SMS/Chat): | |
| - chat_box | |
| - text_in (enter to send) | |
| - chat_diag (JSON) | |
| """ | |
| _startup_clean_runtime_audio() | |
| with gr.Blocks(title="FutureCafe Call/SMS Agent (MVP)") as demo: | |
| gr.Markdown("### ☎️ FutureCafe AI Agent (MVP)\n**Call (voice)** on the left · **SMS/Chat** on the right") | |
| # States | |
| voice_state = gr.State([]) # list of {"role","content"} for voice transcript chat | |
| chat_state = gr.State([]) # list of {"role","content"} for SMS chat | |
| with gr.Row(): | |
| # ---------------- LEFT: VOICE ---------------- | |
| with gr.Column(scale=1, min_width=430): | |
| gr.Markdown("#### 📞 Voice Call") | |
| voice_in = gr.Audio( | |
| label="Press Record → Speak → Stop (auto-sends)", | |
| sources=["microphone"], | |
| type="filepath", | |
| format="wav", | |
| interactive=True, | |
| editable=False, | |
| waveform_options={"show_recording_waveform": True}, | |
| ) | |
| assistant_audio = gr.Audio( | |
| label="Assistant Response (auto-play)", | |
| autoplay=True, | |
| type="filepath", | |
| interactive=False | |
| ) | |
| voice_chat = gr.Chatbot(value=[], type="messages", height=220, label="Voice Chat (transcripts)") | |
| call_diag = gr.JSON( | |
| value={"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0}, | |
| label="Voice Diagnostics" | |
| ) | |
| # ---------------- RIGHT: SMS / CHAT ---------------- | |
| with gr.Column(scale=1, min_width=430): | |
| gr.Markdown("#### 💬 SMS / Chat") | |
| chat_box = gr.Chatbot(value=[], type="messages", height=360, label=None) | |
| text_in = gr.Textbox( | |
| placeholder="Type here… e.g., “Any vegan pizzas?”, “Book a table for 2 at 7.” (Enter to send)", | |
| label=None, lines=1 | |
| ) | |
| chat_diag = gr.JSON( | |
| value={"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0}, | |
| label="Chat Diagnostics" | |
| ) | |
| # ---------- Handlers (thin wrappers that call modular functions) ---------- | |
| def _clear_recorder(): | |
| # Only clears the recorder input; leaves assistant audio + transcripts intact | |
| return gr.update(value=None, interactive=True) | |
| def on_voice_change(aud_path: str | None, voice_hist: list[dict] | None): | |
| import time, sys | |
| def log(*a): print("[VOICE]", *a, file=sys.stderr, flush=True) | |
| empty_diag = {"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0} | |
| if not aud_path: | |
| return (voice_hist or []), None, empty_diag # <= keep shapes correct | |
| t0 = time.time() | |
| # 1) Copy mic clip into runtime/audio (stable path) – optional now; your ASR can read original path | |
| stable_user = aud_path # you can keep original if you like; not required for ASR | |
| log("input:", stable_user) | |
| # 2) ASR | |
| transcript = "(transcription failed)" | |
| try: | |
| asr = get_asr() | |
| out = asr.transcribe(stable_user) | |
| transcript = (out.get("text") or "").strip() or "(no speech detected)" | |
| except Exception as e: | |
| log("ASR error:", e) | |
| # 3) LLM | |
| try: | |
| from models.llm_chat import respond_chat_voice | |
| except Exception: | |
| from models.llm_chat import respond_chat as respond_chat_voice | |
| bot_text, new_policy, policy_diag = respond_chat_voice(voice_hist or [], transcript, {}) | |
| log("transcript:", transcript) | |
| log("bot_text:", bot_text) | |
| # 4) TTS | |
| new_tts = tts_synthesize(bot_text) | |
| print("[VOICE][HF] bot_text len=", len(bot_text), " new_tts=", new_tts, " exists=", (os.path.exists(new_tts) if new_tts else None)) | |
| log("tts_out:", new_tts, os.path.exists(new_tts) if new_tts else None) | |
| # 5) Update voice transcript chat (text) | |
| new_hist = (voice_hist or []) + [ | |
| {"role": "user", "content": transcript}, | |
| {"role": "assistant", "content": bot_text}, | |
| ] | |
| diag = { | |
| "intent": policy_diag.get("policy") if isinstance(policy_diag, dict) else None, | |
| "slots": {}, | |
| "tool_selected": None, | |
| "tool_result": {"transcript": transcript, "llm_response": bot_text, "policy": policy_diag}, | |
| "latency_ms": int((time.time() - t0) * 1000), | |
| } | |
| # Return exactly what your outputs expect (don’t clear assistant audio here) | |
| return new_hist, new_tts, diag | |
| def on_text_send(txt: str, hist: List[Dict[str, str]]): | |
| new_hist, diag, clear_text = handle_text_turn(txt, hist or []) | |
| return new_hist, diag, clear_text | |
| # ---------- Wiring ---------- | |
| # Voice lane: update (voice_chat, assistant_audio, call_diag), do NOT clear recorder to keep it stable for now | |
| # Try to fire on explicit Stop; fall back to generic change if not supported | |
| rec_event = getattr(voice_in, "stop_recording", None) | |
| if callable(rec_event): | |
| rec_event( | |
| on_voice_change, | |
| inputs=[voice_in, voice_state], | |
| outputs=[voice_chat, assistant_audio, call_diag], | |
| api_name="chat_voice", # <— add this | |
| ).then( | |
| _clear_recorder, # runs AFTER outputs are set → autoplay isn’t interrupted | |
| inputs=None, | |
| outputs=[voice_in], | |
| ) | |
| else: | |
| voice_in.change( | |
| on_voice_change, | |
| inputs=[voice_in, voice_state], | |
| outputs=[voice_chat, assistant_audio, call_diag], | |
| api_name="chat_voice", # <— add this | |
| ).then( | |
| _clear_recorder, | |
| inputs=None, | |
| outputs=[voice_in], | |
| ) | |
| # Keep voice_state in sync with what's shown in voice_chat (unchanged) | |
| voice_chat.change(lambda x: x, inputs=[voice_chat], outputs=[voice_state]) | |
| # Text lane: Enter to send | |
| text_in.submit( | |
| on_text_send, | |
| inputs=[text_in, chat_state], | |
| outputs=[chat_box, chat_diag, text_in], | |
| api_name="chat_text", # <— add this | |
| ) | |
| # Keep chat_state in sync with what's shown in chat_box | |
| chat_box.change(lambda x: x, inputs=[chat_box], outputs=[chat_state]) | |
| return demo |