Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import time | |
| import re | |
| import random | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from llama_cpp import Llama | |
| from typing import List, Dict, Any, Tuple | |
| from PIL import Image | |
| from transformers import pipeline | |
| from gtts import gTTS | |
| from diffusers import StableDiffusionPipeline | |
| from docx import Document | |
| from pptx import Presentation | |
| from io import BytesIO | |
| import numpy as np | |
| # --- CONFIGURATION & INITIALIZATION --- | |
| # Use string 'cpu' or GPU index string/int like '0' | |
| USER_DEVICE = "cpu" # keep as "cpu" on CPU-only hosts; change to "0" for GPU 0 | |
| PIPELINE_DEVICE = -1 if str(USER_DEVICE).lower() == "cpu" else int(USER_DEVICE) | |
| TORCH_DEVICE = torch.device("cuda") if torch.cuda.is_available() and PIPELINE_DEVICE != -1 else torch.device("cpu") | |
| os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False' | |
| AUDIO_DIR = "audio_outputs" | |
| DOC_DIR = "doc_outputs" | |
| if not os.path.exists(AUDIO_DIR): | |
| os.makedirs(AUDIO_DIR) | |
| if not os.path.exists(DOC_DIR): | |
| os.makedirs(DOC_DIR) | |
| REPO_ID = "cosmosai471/Luna-v3" | |
| MODEL_FILE = "luna.gguf" | |
| LOCAL_MODEL_PATH = MODEL_FILE | |
| SYSTEM_PROMPT = ( | |
| "You are Luna, a helpful and friendly AI assistant. For internal tracing you may place Intent/Confidence tags, " | |
| "but DO NOT expose these tags in the user-facing response. Any Intent/Confidence/Action metadata must be kept internal." | |
| ) | |
| # --- TUNABLES / GUARDS --- | |
| CONFIDENCE_THRESHOLD = 30 # trigger web-search fallback only under this confidence | |
| STREAM_CHAR_LIMIT = 35000 # cap streaming characters | |
| STREAM_ITER_LIMIT = 20000 # cap streaming iterations | |
| MIN_MEANINGFUL_LENGTH = 20 # min length for file-generation prompts | |
| IMAGE_MAX_SIDE = 1024 # resize images to this max side before sending to image pipeline | |
| # safe destructor for Llama objects | |
| def safe_del(self): | |
| try: | |
| if hasattr(self, "close") and callable(self.close): | |
| self.close() | |
| except Exception: | |
| pass | |
| Llama.__del__ = safe_del | |
| # --- MODEL LOADING --- | |
| llm = None | |
| try: | |
| print(f"Downloading {MODEL_FILE} from {REPO_ID}...") | |
| hf_hub_download(repo_id=REPO_ID, filename=MODEL_FILE, local_dir=".") | |
| if not os.path.exists(LOCAL_MODEL_PATH): | |
| raise FileNotFoundError(f"Download failed for {MODEL_FILE}") | |
| print("Initializing Llama...") | |
| llm = Llama( | |
| model_path=LOCAL_MODEL_PATH, | |
| n_ctx=8192, | |
| n_threads=4, | |
| n_batch=256, | |
| n_gpu_layers=0, | |
| verbose=False | |
| ) | |
| print("β Luna Model loaded successfully!") | |
| except Exception as e: | |
| print(f"β Error loading Luna model: {e}") | |
| class DummyLLM: | |
| def create_completion(self, *args, **kwargs): | |
| yield {'choices': [{'text': '[Intent: qa_general][Confidence: 0] ERROR: Luna model failed to load. Check logs and resources.'}]} | |
| llm = DummyLLM() | |
| # transformer's pipeline expects device int: -1 for CPU | |
| stt_pipe = None | |
| try: | |
| stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=PIPELINE_DEVICE) | |
| print(f"β Loaded Whisper-base on device: {USER_DEVICE}") | |
| except Exception as e: | |
| print(f"β οΈ Could not load Whisper. Voice chat disabled. Error: {e}") | |
| image_pipe = None | |
| try: | |
| VLM_MODEL_ID = "llava-hf/llava-1.5-7b-hf" | |
| image_pipe = pipeline("image-to-text", model=VLM_MODEL_ID, device=PIPELINE_DEVICE) | |
| print(f"β Loaded {VLM_MODEL_ID} for image processing (device={USER_DEVICE}).") | |
| except Exception as e: | |
| print(f"β οΈ Could not load VLM ({VLM_MODEL_ID}). Image chat disabled. Error: {e}") | |
| img_gen_pipe = None | |
| try: | |
| img_gen_pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float32) | |
| img_gen_pipe.to(TORCH_DEVICE) | |
| print(f"β Loaded Stable Diffusion and moved to {TORCH_DEVICE}.") | |
| except Exception as e: | |
| print(f"β οΈ Could not load Image Generation pipeline. Image generation disabled. Error: {e}") | |
| # --- SANITIZERS & UTILITIES --- | |
| def simulate_recording_delay(): | |
| time.sleep(3) | |
| return None | |
| def remove_bracketed_tags(text: str) -> str: | |
| """Remove bracketed tags like [Intent: ...] [Confidence: ...] exactly (safe).""" | |
| if not text: | |
| return "" | |
| text = re.sub(r'\[Intent:\s*[\w\-\_]+\]', '', text, flags=re.IGNORECASE) | |
| text = re.sub(r'\[Confidence:\s*\d{1,3}\]', '', text, flags=re.IGNORECASE) | |
| text = re.sub(r'\[Action:\s*[^\]]+\]', '', text, flags=re.IGNORECASE) | |
| return text | |
| def remove_plain_tag_lines(text: str) -> str: | |
| """Remove whole lines that are just 'Intent: ...' or 'Confidence: ...' preserving inline content.""" | |
| if not text: | |
| return "" | |
| text = re.sub(r'(?im)^\s*Intent\s*[:\-]\s*.*$', '', text) | |
| text = re.sub(r'(?im)^\s*Confidence\s*[:\-]\s*.*$', '', text) | |
| text = re.sub(r'(?im)^\s*Action\s*[:\-]\s*.*$', '', text) | |
| return text | |
| def remove_word_number_dumps(text: str) -> str: | |
| """Remove big classifier dumps like 'greeting 99 2. goodbye 99' but try to preserve normal text. | |
| This removes sequences where a word token is followed immediately by 1-3 numbers and repeats (likely classifier logs). | |
| Only removes when they appear as standalone clusters (surrounded by line breaks or punctuation).""" | |
| if not text: | |
| return "" | |
| # find clusters between line boundaries or punctuation | |
| cluster_pattern = re.compile(r'(?:\n|^|[\(\[\{\.;:,\-\|>])\s*([a-zA-Z_\-]{2,40}(?:\s+\d{1,3}){1,4}(?:\s+[a-zA-Z_\-]{2,40}(?:\s+\d{1,3}){1,4})*)\s*(?:\n|$|[\)\]\}\.;:,\-\|<])', flags=re.IGNORECASE) | |
| def _strip_cluster(m): | |
| return '\n' # replace cluster with a newline to preserve sentence boundaries | |
| text = cluster_pattern.sub(_strip_cluster, text) | |
| # remove leftover isolated numeric sequences (only small groups) | |
| text = re.sub(r'\b\d{2,3}(?:\s+\d{1,3})*\b', '', text) | |
| return text | |
| def collapse_whitespace(text: str) -> str: | |
| if not text: | |
| return "" | |
| text = re.sub(r'\n\s*\n+', '\n\n', text) | |
| text = re.sub(r'[ \t]{2,}', ' ', text) | |
| return text.strip() | |
| def moderate_sanitize_for_ui(raw: str) -> str: | |
| """ | |
| Moderate sanitizer: removes bracketed tags, whole tag-lines, and classifier dumps (carefully), | |
| but otherwise preserves natural language content. | |
| """ | |
| if not raw: | |
| return "" | |
| s = raw | |
| s = remove_bracketed_tags(s) | |
| s = remove_plain_tag_lines(s) | |
| s = remove_word_number_dumps(s) | |
| s = collapse_whitespace(s) | |
| # final quick guard to remove exact words 'Intent' or 'Confidence' if accidentally left alone | |
| s = re.sub(r'(?i)\bIntent\b', '', s) | |
| s = re.sub(r'(?i)\bConfidence\b', '', s) | |
| s = re.sub(r'(?i)\bAction\b', '', s) | |
| s = collapse_whitespace(s) | |
| return s.strip() | |
| # web-search stub | |
| def web_search_tool(query: str) -> str: | |
| time.sleep(1.2) | |
| print(f"Simulating Google Search fallback for: {query}") | |
| return f"\n\nπ **Web Search Results for '{query}':** I found supplemental info to help answer this." | |
| def check_confidence_and_augment(raw_response_with_tags: str, prompt: str) -> str: | |
| """ | |
| Internal: parse confidence if present (for logic only), but never display it. If fallback triggered, | |
| append web results to sanitized response. Uses moderate sanitizer to avoid eating valid content. | |
| """ | |
| cleaned_for_logic = remove_bracketed_tags(raw_response_with_tags) | |
| confidence_match = re.search(r'\[Confidence:\s*([0-9]{1,3})\]', raw_response_with_tags, flags=re.IGNORECASE) | |
| if confidence_match: | |
| try: | |
| confidence_score = int(confidence_match.group(1)) | |
| confidence_score = max(0, min(confidence_score, 100)) | |
| except Exception: | |
| confidence_score = 0 | |
| else: | |
| cleaned_no_tags = moderate_sanitize_for_ui(cleaned_for_logic) | |
| confidence_score = 10 if not cleaned_no_tags or len(cleaned_no_tags) < 30 else 85 | |
| if confidence_score < CONFIDENCE_THRESHOLD: | |
| print(f"[internal] Low confidence ({confidence_score}%) detected -> using web fallback") | |
| supplement = web_search_tool(prompt) | |
| out = moderate_sanitize_for_ui(cleaned_for_logic) | |
| if not out: | |
| out = "I couldn't generate a reliable answer. " + moderate_sanitize_for_ui(supplement) | |
| else: | |
| out = out + "\n\n" + moderate_sanitize_for_ui(supplement) | |
| else: | |
| out = moderate_sanitize_for_ui(cleaned_for_logic) | |
| out = out or "Sorry β I couldn't produce a good answer. Could you rephrase or give more details?" | |
| return out | |
| # --- IMAGE / VQA PROCESSING (robust + resize) --- | |
| def _resize_image_keep_aspect(img: Image.Image, max_side: int) -> Image.Image: | |
| w, h = img.size | |
| if max(w, h) <= max_side: | |
| return img | |
| scale = max_side / float(max(w, h)) | |
| new_w = int(w * scale) | |
| new_h = int(h * scale) | |
| return img.resize((new_w, new_h), Image.LANCZOS) | |
| def process_image(image_data_or_path: Any, message: str) -> Tuple[str, bool]: | |
| """ | |
| Uses image_pipe to produce VQA text. Resizes image to avoid token/feature mismatch issues. | |
| Returns prompt-injection (safe) + success flag. | |
| """ | |
| global image_pipe | |
| success = False | |
| if image_pipe is None: | |
| return f"[Image Processing Error: VLM model not loaded.] **User Query:** {message}", False | |
| image = None | |
| try: | |
| if isinstance(image_data_or_path, str): | |
| image = Image.open(image_data_or_path).convert("RGB") | |
| elif isinstance(image_data_or_path, np.ndarray): | |
| image = Image.fromarray(image_data_or_path).convert("RGB") | |
| else: | |
| try: | |
| image = Image.open(BytesIO(image_data_or_path)).convert("RGB") | |
| except Exception: | |
| image = None | |
| if image is None: | |
| return f"[Image Processing Error: Could not open image.] **User Query:** {message}", False | |
| # Resize defensively before passing to VLM pipeline (fixes token/features mismatch errors) | |
| image = _resize_image_keep_aspect(image, IMAGE_MAX_SIDE) | |
| vqa_prompt = f"USER: <image>\n{message}\nASSISTANT:" | |
| results = None | |
| try: | |
| # preferred signature | |
| results = image_pipe(image, prompt=vqa_prompt) | |
| except TypeError: | |
| try: | |
| results = image_pipe(image) | |
| except Exception as e: | |
| print(f"Image pipeline call failed: {e}") | |
| results = None | |
| except Exception as e: | |
| print(f"Image pipeline call error: {e}") | |
| results = None | |
| raw_text = "" | |
| if results is None: | |
| raw_text = "" | |
| elif isinstance(results, dict): | |
| raw_text = results.get("generated_text") or results.get("text") or "" | |
| elif isinstance(results, list): | |
| first = results[0] | |
| if isinstance(first, dict): | |
| raw_text = first.get("generated_text") or first.get("text") or "" | |
| elif isinstance(first, str): | |
| raw_text = first | |
| elif isinstance(results, str): | |
| raw_text = results | |
| else: | |
| try: | |
| raw_text = str(results) | |
| except Exception: | |
| raw_text = "" | |
| vqa_response = raw_text.split("ASSISTANT:")[-1].strip() if raw_text else "" | |
| vqa_response = moderate_sanitize_for_ui(vqa_response) | |
| if not vqa_response or len(vqa_response) < 10: | |
| vqa_response = ( | |
| "VQA analysis didn't return a clear answer. The image might be unclear or the question ambiguous. " | |
| "Please re-upload a clearer image, crop to the subject, or give a short instruction about what you'd like answered." | |
| ) | |
| success = False | |
| else: | |
| success = True | |
| prompt_injection = f"**VQA Analysis:** {vqa_response}\n\n**User Query:** {moderate_sanitize_for_ui(message)}" | |
| return prompt_injection, success | |
| except Exception as e: | |
| print(f"Image processing exception: {e}") | |
| return f"[Image Processing Error: {e}] **User Query:** {moderate_sanitize_for_ui(message)}", False | |
| # --- AUDIO / TTS --- | |
| def transcribe_audio(audio_file_path: str) -> Tuple[str, str, gr.update, gr.update, bool, gr.update]: | |
| if stt_pipe is None or not audio_file_path: | |
| error_msg = "Error: Whisper model failed to load or no audio recorded." | |
| return "", error_msg, gr.update(interactive=True), gr.update(value="β", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False) | |
| try: | |
| transcribed_text = stt_pipe(audio_file_path)["text"] | |
| new_button_update = gr.update(value="β", interactive=True, elem_classes=["circle-btn", "send-mode"]) | |
| return ( | |
| transcribed_text.strip(), | |
| f"ποΈ Transcribed: '{transcribed_text.strip()}'", | |
| gr.update(interactive=True), | |
| new_button_update, | |
| True, | |
| gr.update(visible=False) | |
| ) | |
| except Exception as e: | |
| error_msg = f"Transcription Error: {e}" | |
| return "", error_msg, gr.update(interactive=True), gr.update(value="β", interactive=True, elem_classes=["circle-btn", "send-mode"]), False, gr.update(visible=False) | |
| def text_to_audio(text: str, is_voice_chat: bool) -> str or None: | |
| if not is_voice_chat: | |
| return None | |
| clean_text = re.sub(r'```.*?```|\[Image Processing Error:.*?\]|\*\*Web Search Results:.*?$|\(file=.*?\)', '', text, flags=re.DOTALL | re.MULTILINE) | |
| if len(clean_text.strip()) > 5: | |
| try: | |
| audio_output_path = os.path.join(AUDIO_DIR, f"luna_response_{random.randint(1000, 9999)}.mp3") | |
| tts = gTTS(text=clean_text.strip(), lang='en') | |
| tts.save(audio_output_path) | |
| return audio_output_path | |
| except Exception as e: | |
| print(f"gTTS Error: {e}") | |
| return None | |
| return None | |
| # --- INTENT MAP & PARSING --- | |
| INTENT_STATUS_MAP = { | |
| "code_generate": "Analyzing requirements and drafting code π»...", | |
| "code_explain": "Reviewing code logic and writing explanation π‘...", | |
| "qa_general": "Drafting comprehensive general answer βοΈ...", | |
| "greeting": "Replying to greeting π...", | |
| "vqa": "Analyzing VQA results and forming a final response π§ ...", | |
| "image_generate": "Generating image using Stable Diffusion (This may be slow on CPU) πΌοΈ...", | |
| "doc_generate": "Generating content and formatting DOCX file π...", | |
| "ppt_generate": "Generating content and formatting PPTX file π...", | |
| "open_camera": "Activating camera for image capture πΈ...", | |
| "open_google": "Simulating external search link generation π...", | |
| "default": "Luna is thinking...", | |
| } | |
| # Additional keyword-based intent inference (helps when model doesn't include tags) | |
| INTENT_KEYWORD_MAP = [ | |
| (re.compile(r"\b(create|generate|make)\b.*\b(image|picture|photo|art)\b", flags=re.IGNORECASE), "image_generate"), | |
| (re.compile(r"\b(create|generate|make)\b.*\b(document|doc|report|letter|resume)\b", flags=re.IGNORECASE), "doc_generate"), | |
| (re.compile(r"\b(create|generate|make)\b.*\b(presentation|ppt|slides)\b", flags=re.IGNORECASE), "ppt_generate"), | |
| ] | |
| def infer_intent_from_content(text: str) -> str: | |
| if not text: | |
| return "default" | |
| for patt, intent in INTENT_KEYWORD_MAP: | |
| if patt.search(text): | |
| return intent | |
| return "default" | |
| def get_intent_status(raw_response: str, is_vqa_flow: bool) -> Tuple[str, str, str]: | |
| """ | |
| Internal parsing: returns (intent, status, cleaned_display_text). | |
| cleaned_display_text preserves content but strips tags/garbage moderately. | |
| If no explicit [Intent:] tag is found, infer intent from content_for_tool keywords. | |
| """ | |
| intent_match = re.search(r'\[Intent:\s*([\w\-\_]+)\]', raw_response, re.IGNORECASE) | |
| intent = intent_match.group(1).lower() if intent_match else None | |
| if is_vqa_flow: | |
| intent = "vqa" | |
| cleaned_text = moderate_sanitize_for_ui(raw_response) | |
| # If no explicit intent from tags, try to infer from cleaned_text | |
| if not intent or intent == "default": | |
| inferred = infer_intent_from_content(cleaned_text) | |
| if inferred != "default": | |
| intent = inferred | |
| intent = intent or "default" | |
| status = INTENT_STATUS_MAP.get(intent, INTENT_STATUS_MAP["default"]) | |
| return intent, status, cleaned_text | |
| # --- FILE / IMAGE GENERATION --- | |
| def generate_file_content(content: str, history: List[Dict[str, str]], file_type: str): | |
| file_path = None | |
| try: | |
| if not content or len(content.strip()) < MIN_MEANINGFUL_LENGTH: | |
| history[-1]['content'] = ( | |
| f"β οΈ I was asked to create a {file_type}, but I don't have enough details. " | |
| "Please provide a 1β2 sentence description of what the file should contain." | |
| ) | |
| return history, None | |
| if file_type == "image": | |
| if img_gen_pipe is None: | |
| raise RuntimeError("Image generation model not loaded.") | |
| image = img_gen_pipe(content).images[0] | |
| file_filename = f"generated_img_{random.randint(1000, 9999)}.png" | |
| file_path = os.path.join(DOC_DIR, file_filename) | |
| image.save(file_path) | |
| display_content = f"πΌοΈ **Image Generated!**\n\n[Download {file_filename}](file={file_path})" | |
| elif file_type == "doc": | |
| doc = Document() | |
| doc.add_heading('Luna Generated Document', 0) | |
| doc.add_paragraph(content) | |
| file_filename = f"generated_doc_{random.randint(1000, 9999)}.docx" | |
| file_path = os.path.join(DOC_DIR, file_filename) | |
| doc.save(file_path) | |
| display_content = f"π **Document Generated!** Summary:\n\n{content[:200]}...\n\n[Download {file_filename}](file={file_path})" | |
| elif file_type == "ppt": | |
| prs = Presentation() | |
| slide = prs.slides.add_slide(prs.slide_layouts[0]) | |
| slide.shapes.title.text = "Luna Generated Presentation" | |
| try: | |
| slide.placeholders[1].text = content[:200] + "..." | |
| except Exception: | |
| pass | |
| file_filename = f"generated_ppt_{random.randint(1000, 9999)}.pptx" | |
| file_path = os.path.join(DOC_DIR, file_filename) | |
| prs.save(file_path) | |
| display_content = f"π **Presentation Generated!** Summary:\n\n{content[:200]}...\n\n[Download {file_filename}](file={file_path})" | |
| else: | |
| raise ValueError(f"Unknown file type: {file_type}") | |
| history[-1]['content'] = display_content | |
| except Exception as e: | |
| error_msg = f"β **Error generating {file_type.upper()}:** {e}. Check logs/libs." | |
| history[-1]['content'] = error_msg | |
| file_path = None | |
| return history, file_path | |
| # --- CORE GENERATOR FUNCTION --- | |
| def chat_generator(message_from_input: str, image_input_data: Any, history: List[Dict[str, str]], stop_signal: bool, is_voice_chat: bool) -> Any: | |
| """ | |
| - assistant entry appended only when generation actually starts (no empty box). | |
| - streaming sanitized moderately to keep meaning while removing metadata. | |
| - when image is attached, VQA flow is strictly used (image model output injected to LLM). | |
| """ | |
| if not history or history[-1]['role'] != 'user': | |
| yield history, False, "Error: Generator called in unexpected state (no user message found).", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| return | |
| last_user_index = len(history) - 1 | |
| original_message = history[last_user_index]['content'] or "" | |
| # detect VQA flow: if image attached, force image flow | |
| is_vqa_flow = False | |
| if isinstance(image_input_data, str): | |
| is_vqa_flow = bool(image_input_data) | |
| elif isinstance(image_input_data, np.ndarray): | |
| is_vqa_flow = image_input_data.size > 0 | |
| else: | |
| is_vqa_flow = image_input_data is not None | |
| vqa_success = False | |
| llm_input_message = original_message | |
| if is_vqa_flow: | |
| processed_message, vqa_success = process_image(image_input_data, original_message) | |
| history[last_user_index]['content'] = f"[IMAGE RECEIVED] {moderate_sanitize_for_ui(original_message)}" | |
| # ensure that LLM prompt includes VQA analysis and the user message | |
| llm_input_message = processed_message | |
| # build prompt | |
| prompt = f"SYSTEM: {SYSTEM_PROMPT}\n" | |
| for item in history[:-1]: | |
| role = item['role'].upper() | |
| content = item['content'] or "" | |
| if role == "ASSISTANT": | |
| prompt += f"LUNA: {content}\n" | |
| elif role == "USER": | |
| prompt += f"USER: {content}\n" | |
| prompt += f"USER: {llm_input_message}\nLUNA: " | |
| # append assistant entry now | |
| assistant_initial_text = "β¨ Luna is starting to think..." | |
| history.append({"role": "assistant", "content": assistant_initial_text}) | |
| yield history, stop_signal, assistant_initial_text, gr.update(value="", interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| time.sleep(0.12) | |
| full_response = "" | |
| current_intent = "default" | |
| iter_count = 0 | |
| try: | |
| stream = llm.create_completion( | |
| prompt=prompt, max_tokens=8192, | |
| stop=["USER:", "SYSTEM:", "</s>"], | |
| echo=False, stream=True, temperature=0.7 | |
| ) | |
| except Exception as e: | |
| err = f"β Error generating response: {e}" | |
| history[-1]['content'] = moderate_sanitize_for_ui(err) | |
| yield history, False, err, gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| return | |
| # stream tokens; moderately sanitize and cap | |
| try: | |
| for output in stream: | |
| iter_count += 1 | |
| if iter_count > STREAM_ITER_LIMIT: | |
| full_response += "\n\n[Stream aborted: iteration limit reached]" | |
| print("Stream aborted by iter limit.") | |
| break | |
| token = output["choices"][0].get("text", "") | |
| if not isinstance(token, str): | |
| token = str(token) | |
| full_response += token | |
| if len(full_response) > STREAM_CHAR_LIMIT: | |
| full_response = full_response[:STREAM_CHAR_LIMIT] + "\n\n[Truncated: length limit reached]" | |
| print("Stream truncated by char limit.") | |
| break | |
| current_intent, current_hint, interim = get_intent_status(full_response, is_vqa_flow and vqa_success) | |
| interim_ui = moderate_sanitize_for_ui(interim) | |
| if not interim_ui: | |
| interim_ui = "β¨ Luna is forming a reply..." | |
| history[-1]['content'] = interim_ui | |
| yield history, stop_signal, current_hint, gr.update(interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| except Exception as e: | |
| _, _, salvage = get_intent_status(full_response, is_vqa_flow and vqa_success) | |
| salvage_ui = moderate_sanitize_for_ui(salvage) or f"β οΈ Streaming interrupted: {e}" | |
| history[-1]['content'] = salvage_ui | |
| yield history, False, f"β οΈ Streaming interrupted: {e}", gr.update(interactive=True), gr.update(value="β", interactive=True), None, False, gr.update(visible=True), image_input_data, gr.update(), gr.update() | |
| return | |
| # post-process | |
| file_download_path = None | |
| _, _, content_for_tool = get_intent_status(full_response, is_vqa_flow and vqa_success) | |
| content_for_tool = moderate_sanitize_for_ui(content_for_tool) | |
| if current_intent == "image_generate": | |
| if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH: | |
| history[-1]['content'] = "I detected an image generation request but didn't get enough details. Please give a short description (e.g. 'red bicycle at sunrise, vivid colors')." | |
| else: | |
| history[-1]['content'] = INTENT_STATUS_MAP[current_intent] | |
| yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| history, file_download_path = generate_file_content(content_for_tool, history, "image") | |
| elif current_intent == "doc_generate": | |
| if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH: | |
| history[-1]['content'] = "I can create a document, but I need a 1β2 sentence description of what to include." | |
| else: | |
| history[-1]['content'] = INTENT_STATUS_MAP[current_intent] | |
| yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| history, file_download_path = generate_file_content(content_for_tool, history, "doc") | |
| elif current_intent == "ppt_generate": | |
| if not content_for_tool or len(content_for_tool.strip()) < MIN_MEANINGFUL_LENGTH: | |
| history[-1]['content'] = "I can make a short presentation β please give a title and 3β5 bullet points." | |
| else: | |
| history[-1]['content'] = INTENT_STATUS_MAP[current_intent] | |
| yield history, stop_signal, history[-1]['content'], gr.update(interactive=False), gr.update(value="Stop βΉοΈ", interactive=True), None, is_voice_chat, gr.update(visible=False), image_input_data, gr.update(), gr.update() | |
| history, file_download_path = generate_file_content(content_for_tool, history, "ppt") | |
| elif current_intent == "open_google": | |
| final_text = (content_for_tool or "").strip() + "\n\nπ **Action:** [Search Google](https://www.google.com/search?q=" + re.sub(r'\s+', '+', moderate_sanitize_for_ui(original_message)) + ")" | |
| history[-1]['content'] = moderate_sanitize_for_ui(final_text) | |
| elif current_intent == "open_camera": | |
| final_text = (content_for_tool or "").strip() + "\n\nπΈ **Action:** Use the 'Google Lens' button to capture an image." | |
| history[-1]['content'] = moderate_sanitize_for_ui(final_text) | |
| else: | |
| final_response_content = check_confidence_and_augment(full_response, original_message) | |
| history[-1]['content'] = final_response_content | |
| if not history[-1]['content'] or not str(history[-1]['content']).strip(): | |
| history[-1]['content'] = "Sorry β I couldn't produce a useful response. Could you rephrase or add details?" | |
| audio_file_path = text_to_audio(history[-1]['content'], is_voice_chat) | |
| hint = "β Response generated." | |
| yield history, False, hint, gr.update(interactive=True), gr.update(value="β", interactive=True), audio_file_path, False, gr.update(visible=True), gr.update(value=None), gr.update(), file_download_path | |
| # --- GRADIO WRAPPERS --- | |
| def toggle_menu(current_visibility: bool) -> Tuple[bool, gr.update, gr.update, gr.update]: | |
| new_visibility = not current_visibility | |
| return new_visibility, gr.update(visible=new_visibility), gr.update(visible=False), gr.update(value="β¬οΈ" if new_visibility else "β") | |
| def user_turn(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]]]: | |
| has_text = bool(user_message and user_message.strip()) | |
| has_image = False | |
| if isinstance(staged_image_input, str): | |
| has_image = staged_image_input != "" | |
| elif isinstance(staged_image_input, np.ndarray): | |
| has_image = staged_image_input.size > 0 | |
| else: | |
| has_image = staged_image_input is not None | |
| if not has_text and not has_image: | |
| return user_message, chat_history | |
| if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] and "thinking" in chat_history[-1]['content'].lower(): | |
| return user_message, chat_history | |
| user_message_to_add = "Analyzing Staged Media." if (not has_text and has_image) else user_message.strip() | |
| chat_history.append({"role": "user", "content": moderate_sanitize_for_ui(user_message_to_add)}) | |
| return "", chat_history | |
| def stage_file_upload(file_path: str) -> Tuple[Any, str, gr.update, gr.update]: | |
| if file_path: | |
| return file_path, f"π File staged: {os.path.basename(file_path)}. Click send (βοΈ).", gr.update(value="", interactive=True), gr.update(interactive=False) | |
| return None, "File upload cancelled.", gr.update(value="", interactive=True), gr.update(interactive=False) | |
| def clear_staged_media() -> gr.update: | |
| return gr.update(value=None) | |
| def manual_fact_check(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, gr.update]: | |
| if not history or not history[-1]['content']: | |
| return history, "Error: No final response to check.", gr.update(visible=False) | |
| last_user_prompt = "" | |
| for item in reversed(history): | |
| if item['role'] == 'user' and item['content']: | |
| last_user_prompt = item['content'].split("**User Query:**")[-1].strip().replace("[IMAGE RECEIVED]", "").strip() | |
| break | |
| if not last_user_prompt: | |
| return history, "Error: Could not find query.", gr.update(visible=False) | |
| web_results = web_search_tool(last_user_prompt) | |
| new_history = list(history) | |
| new_history[-1]['content'] += "\n\n" + moderate_sanitize_for_ui(web_results) | |
| return new_history, "β Double-checked with web facts.", gr.update(visible=False) | |
| def auto_capture_camera(user_message: str, chat_history: List[Dict[str, str]], staged_image_input: Any) -> Tuple[str, List[Dict[str, str]], Any, gr.update, gr.update, gr.update, gr.update, gr.update]: | |
| _, chat_history = user_turn(user_message, chat_history, staged_image_input) | |
| if chat_history and chat_history[-1]['role'] == 'assistant' and chat_history[-1]['content'] == "": | |
| chat_history[-1]['content'] = "πΈ Preparing camera capture..." | |
| return "", chat_history, staged_image_input, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(value="πΈ Capturing in 3 seconds...", interactive=False), gr.update(value="β") | |
| # --- GRADIO UI --- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Prototype") as demo: | |
| stop_signal = gr.State(value=False) | |
| is_voice_chat = gr.State(value=False) | |
| staged_image = gr.State(value=None) | |
| menu_visible_state = gr.State(value=False) | |
| gr.HTML("<h1 style='text-align: center; color: #4B0082;'>π Prototype</h1>") | |
| hint_box = gr.Textbox(value="Ask anything", lines=1, show_label=False, interactive=False, placeholder="Luna's Action...", visible=True) | |
| file_download_output = gr.File(label="Generated File", visible=False) | |
| with gr.Row(visible=False) as fact_check_btn_row: | |
| gr.Column(min_width=1); btn_fact_check = gr.Button("Fact Check π"); gr.Column(min_width=1) | |
| chatbot = gr.Chatbot(label="Luna", height=500, type='messages') | |
| with gr.Row(visible=False) as webcam_capture_row: | |
| webcam_capture_component = gr.Image(sources=["webcam"], type="numpy", show_label=False) | |
| close_webcam_btn = gr.Button("β Use this image") | |
| with gr.Row(visible=False) as audio_record_row: | |
| audio_input = gr.Audio(sources=["microphone"], type="filepath", show_label=False) | |
| with gr.Column(visible=False, elem_id="menu_options_row") as menu_options_row: | |
| file_input = gr.File(type="filepath", label="File Uploader", interactive=False) | |
| btn_take_photo = gr.Button("πΈ Google Lens (Take Photo)") | |
| btn_add_files = gr.Button("π Upload File") | |
| with gr.Row(variant="panel") as input_row: | |
| btn_menu = gr.Button("β", interactive=True, size="sm") | |
| txt = gr.Textbox(placeholder="Ask anything", show_label=False, lines=1, autofocus=True) | |
| mic_btn = gr.Button("ποΈ", interactive=True, size="sm") | |
| combined_btn = gr.Button("βοΈ", variant="primary", size="sm") | |
| audio_output = gr.Audio(visible=False) | |
| output_components = [chatbot, stop_signal, hint_box, txt, combined_btn, audio_output, is_voice_chat, fact_check_btn_row, staged_image, file_input, file_download_output] | |
| # wiring | |
| btn_menu.click(fn=toggle_menu, inputs=[menu_visible_state], outputs=[menu_visible_state, menu_options_row, fact_check_btn_row, btn_menu], queue=False) | |
| def prepare_file_upload(): return gr.update(visible=False), gr.update(value="β"), gr.update(visible=False), gr.update(interactive=True), gr.update(value="") | |
| btn_add_files.click(fn=prepare_file_upload, inputs=[], outputs=[menu_options_row, btn_menu, fact_check_btn_row, file_input, txt], queue=False) | |
| file_input.change(fn=stage_file_upload, inputs=[file_input], outputs=[staged_image, hint_box, txt, file_input], queue=False) | |
| btn_take_photo.click( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "πΈ Camera Active. Capture an image.", gr.update(value="β")), | |
| inputs=[], outputs=[menu_options_row, webcam_capture_row, input_row, hint_box, btn_menu], queue=False | |
| ) | |
| close_webcam_btn.click( | |
| fn=lambda img: (gr.update(visible=True), gr.update(visible=False), img, f"πΈ Photo staged: Click send (βοΈ).", gr.update(value="")), | |
| inputs=[webcam_capture_component], outputs=[input_row, webcam_capture_row, staged_image, hint_box, txt], queue=False | |
| ) | |
| mic_btn.click( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True), "ποΈ Recording..."), | |
| inputs=[], outputs=[input_row, audio_record_row, hint_box], queue=False | |
| ).then( | |
| fn=simulate_recording_delay, inputs=[], outputs=[], queue=False | |
| ).then( | |
| fn=lambda: (gr.update(visible=True), gr.update(visible=False), "ποΈ Processing recording..."), | |
| inputs=[], outputs=[input_row, audio_record_row, hint_box], queue=False | |
| ).then( | |
| fn=transcribe_audio, inputs=audio_input, outputs=[txt, hint_box, txt, combined_btn, is_voice_chat, fact_check_btn_row], queue=False | |
| ).then( | |
| fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False | |
| ).then( | |
| fn=chat_generator, inputs=[txt, staged_image, chatbot, stop_signal, is_voice_chat], outputs=output_components, queue=True | |
| ).then( | |
| fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False | |
| ) | |
| generator_inputs = [txt, staged_image, chatbot, stop_signal, is_voice_chat] | |
| txt.submit(fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False).then( | |
| fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True | |
| ).then(fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False) | |
| combined_btn.click(fn=user_turn, inputs=[txt, chatbot, staged_image], outputs=[txt, chatbot], queue=False).then( | |
| fn=chat_generator, inputs=generator_inputs, outputs=output_components, queue=True | |
| ).then(fn=clear_staged_media, inputs=[], outputs=[staged_image], queue=False) | |
| btn_fact_check.click(fn=manual_fact_check, inputs=[chatbot], outputs=[chatbot, hint_box, fact_check_btn_row], queue=True) | |
| demo.queue(max_size=20).launch(server_name="0.0.0.0") | |