Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import subprocess | |
| import tempfile | |
| # subprocess.run('pip install flash-attn==2.8.0 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) | |
| import threading | |
| # subprocess.check_call([os.sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) | |
| import spaces | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoProcessor, AutoTokenizer, TextIteratorStreamer | |
| from analytics import AnalyticsLogger | |
| from kernels import get_kernel | |
| from typing import Any, Optional, Dict | |
| from PIL import Image | |
| import base64 | |
| import io | |
| #vllm_flash_attn3 = get_kernel("kernels-community/vllm-flash-attn3") | |
| #torch._dynamo.config.disable = True | |
| HF_LE_LLM_READ_TOKEN = os.environ.get('HF_LE_LLM_READ_TOKEN') | |
| from huggingface_hub import login | |
| login(token=HF_LE_LLM_READ_TOKEN) | |
| #MODEL_ID = "le-llm/lapa-v0.1-reasoning-only-32768" | |
| MODEL_ID = "le-llm/lapa-v0.1-instruct" | |
| MODEL_ID = "le-llm/lapa-v0.1-matt-instruction-5e06" | |
| MODEL_ID = "le-llm/lapa-v0.1-reprojected" | |
| logger = AnalyticsLogger() | |
| def _begin_analytics_session(): | |
| # Called once per client on app load | |
| _ = logger.start_session(MODEL_ID) | |
| def load_model(): | |
| """Lazy-load model, tokenizer, and optional processor (for zeroGPU).""" | |
| device = "cuda" # if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
| processor = None | |
| try: | |
| processor = AutoProcessor.from_pretrained(MODEL_ID) | |
| except Exception as err: # pragma: no cover - informative fallback | |
| print(f"Warning: AutoProcessor not available ({err}). Falling back to tokenizer.") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| dtype=torch.bfloat16, # if device == "cuda" else torch.float32, | |
| device_map="auto", # if device == "cuda" else None, | |
| attn_implementation="flash_attention_2",# "kernels-community/vllm-flash-attn3", # # | |
| ) # .cuda() | |
| print(f"Selected device:", device) | |
| return model, tokenizer, processor, device | |
| # Load model/tokenizer each request → allows zeroGPU to cold start & then release | |
| model, tokenizer, processor, device = load_model() | |
| def user(user_message, image_data, history: list): | |
| """Format user message with optional image.""" | |
| import base64 | |
| import io | |
| from PIL import Image | |
| user_message = user_message or "" | |
| updated_history = list(history) | |
| has_content = False | |
| stripped_message = user_message.strip() | |
| # If we have an image, save it to temp file for Gradio display and also encode as base64 for model | |
| if image_data is not None: | |
| # Save to temp file for Gradio display | |
| fd, tmp_path = tempfile.mkstemp(suffix=".jpg") | |
| os.close(fd) | |
| image_data.save(tmp_path, format="JPEG") | |
| # Also encode as base64 for model processing (stored in metadata) | |
| buffered = io.BytesIO() | |
| image_data.save(buffered, format="JPEG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
| text_content = stripped_message if stripped_message else "Describe this image" | |
| # Store both text and image in a single message with base64 in metadata | |
| updated_history.append({ | |
| "role": "user", | |
| "content": text_content | |
| }) | |
| updated_history.append({ | |
| "role": "user", | |
| "content": { | |
| "path": tmp_path, | |
| "alt_text": "User uploaded image" | |
| }, | |
| }) | |
| has_content = True | |
| elif stripped_message: | |
| updated_history.append({"role": "user", "content": stripped_message}) | |
| has_content = True | |
| if not has_content: | |
| # Nothing to submit yet; keep inputs unchanged | |
| return user_message, image_data, history | |
| return "", None, updated_history | |
| def append_example_message(x: gr.SelectData, history): | |
| print(x) | |
| print(x.value) | |
| print(x.value["text"]) | |
| if x.value["text"] is not None: | |
| history.append({"role": "user", "content": x.value["text"]}) | |
| return history | |
| def _extract_text_from_content(content: Any) -> str: | |
| """Extract text from message content for logging.""" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text": | |
| text_parts.append(item.get("text", "")) | |
| return " ".join(text_parts) if text_parts else "[Image]" | |
| return str(content) | |
| def _clean_history_for_display(history: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Remove internal metadata fields like _base64 before displaying in Gradio.""" | |
| cleaned = [] | |
| for message in history: | |
| cleaned_message = {"role": message.get("role", "user")} | |
| content = message.get("content") | |
| if isinstance(content, str): | |
| cleaned_message["content"] = content | |
| elif isinstance(content, list): | |
| cleaned_content = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| # Remove _base64 metadata | |
| cleaned_item = {k: v for k, v in item.items() if not k.startswith("_")} | |
| cleaned_content.append(cleaned_item) | |
| else: | |
| cleaned_content.append(item) | |
| cleaned_message["content"] = cleaned_content | |
| else: | |
| cleaned_message["content"] = content | |
| cleaned.append(cleaned_message) | |
| return cleaned | |
| def format_message_with_image( | |
| text: str, role: str, image: Optional[Image.Image] = None | |
| ) -> Dict[str, Any]: | |
| """Format message for VLLM API with optional image.""" | |
| if image is not None: | |
| # Convert PIL image to base64 | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
| return { | |
| "role": role, | |
| "content": [ | |
| {"type": "text", "text": text}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, | |
| }, | |
| ], | |
| } | |
| else: | |
| return {"role": role, "content": text} | |
| def bot( | |
| history: list[dict[str, Any]] | |
| ): | |
| """Generate bot response with support for text and images.""" | |
| max_tokens = 4096 | |
| temperature = 0.7 | |
| top_p = 0.95 | |
| # Early return if no input | |
| if not history: | |
| return | |
| # Extract last user message for logging | |
| last_user_msg = next((msg for msg in reversed(history) if msg.get("role") == "user"), None) | |
| user_message_text = _extract_text_from_content(last_user_msg.get("content")) if last_user_msg else "" | |
| print('User message:', user_message_text) | |
| # Check if any message contains images | |
| has_images = any( | |
| isinstance(msg.get("content"), list) and | |
| any(item.get("type") == "image" for item in msg.get("content") if isinstance(item, dict)) | |
| for msg in history | |
| ) | |
| model_inputs = None | |
| # Use processor if images are present | |
| if processor is not None and has_images: | |
| try: | |
| processor_history = [] | |
| for msg in history: | |
| role = msg.get("role", "user") | |
| content = msg.get("content") | |
| if isinstance(content, str): | |
| processor_history.append({"role": role, "content": content}) | |
| elif isinstance(content, list): | |
| formatted_content = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| # Add text | |
| if item.get("type") == "text": | |
| formatted_content.append({"type": "text", "text": item.get("text", "")}) | |
| elif item.get("type") == "image": | |
| # Use _base64 metadata if available, otherwise load from path | |
| pil_image = None | |
| if "_base64" in item: | |
| img_url = item["_base64"] | |
| if img_url.startswith("data:image"): | |
| base64_data = img_url.split(",")[1] | |
| img_data = base64.b64decode(base64_data) | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| elif "path" in item: | |
| pil_image = Image.open(item["path"]) | |
| if pil_image is not None: | |
| # formatted_content.append({"type": "image", "image": pil_image}) | |
| buffered = io.BytesIO() | |
| pil_image.save(buffered, format="JPEG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, | |
| } | |
| if formatted_content: | |
| processor_history.append({"role": role, "content": formatted_content}) | |
| model_inputs = processor( | |
| messages=processor_history, | |
| return_tensors="pt", | |
| add_generation_prompt=True, | |
| ).to(model.device) | |
| print("Using processor for vision input") | |
| except Exception as exc: | |
| print(f"Processor failed: {exc}") | |
| model_inputs = None | |
| # Fallback to tokenizer for text-only | |
| if model_inputs is None: | |
| # Convert to text-only format for tokenizer | |
| text_history = [] | |
| for msg in history: | |
| role = msg.get("role", "user") | |
| content = msg.get("content") | |
| text_content = _extract_text_from_content(content) | |
| if text_content: | |
| text_history.append({"role": role, "content": text_content}) | |
| if text_history: | |
| input_text = tokenizer.apply_chat_template( | |
| text_history, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| ) | |
| if input_text and tokenizer.bos_token: | |
| input_text = input_text.replace(tokenizer.bos_token, "", 1) | |
| model_inputs = tokenizer(input_text, return_tensors="pt").to(model.device) | |
| print("Using tokenizer for text-only input") | |
| if model_inputs is None: | |
| return | |
| # Streamer setup | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True) | |
| # Run model.generate in background thread | |
| generation_kwargs = dict( | |
| **model_inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=64, | |
| do_sample=True, | |
| streamer=streamer, | |
| ) | |
| thread = threading.Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| history.append({"role": "assistant", "content": ""}) | |
| # Yield tokens as they come in | |
| for new_text in streamer: | |
| history[-1]["content"] += new_text | |
| yield _clean_history_for_display(history) | |
| assistant_message = history[-1]["content"] | |
| logger.log_interaction(user=user_message_text, answer=assistant_message) | |
| # --- drop-in UI compatible with older Gradio versions --- | |
| import os, tempfile, time | |
| import gradio as gr | |
| # Ukrainian-inspired theme with deep, muted colors reflecting unbeatable spirit: | |
| THEME = gr.themes.Soft( | |
| primary_hue="blue", # Deep blue representing Ukrainian sky and resolve | |
| secondary_hue="amber", # Warm amber representing golden fields and determination | |
| neutral_hue="stone", # Earthy stone representing strength and foundation | |
| ) | |
| # Load CSS from external file | |
| def load_css(): | |
| try: | |
| with open("static/style.css", "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| print("Warning: static/style.css not found") | |
| return "" | |
| CSS = load_css() | |
| def _clear_chat(): | |
| return "", None, [] | |
| with gr.Blocks(theme=THEME, css=CSS, fill_height=True) as demo: | |
| demo.load(fn=_begin_analytics_session, inputs=None, outputs=None) | |
| # Header (no gr.Box to avoid version issues) | |
| gr.HTML( | |
| """ | |
| <div id="app-header"> | |
| <div class="app-title">✨ LAPA</div> | |
| <div class="app-subtitle">LLM for Ukrainian Language</div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(equal_height=True): | |
| # Left side: Chat | |
| with gr.Column(scale=7, elem_id="left-pane"): | |
| with gr.Column(elem_id="chat-card"): | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| height=560, | |
| render_markdown=True, | |
| show_copy_button=True, | |
| show_label=False, | |
| # likeable=True, | |
| allow_tags=["think"], | |
| elem_id="chatbot", | |
| examples=[ | |
| {"text": i} | |
| for i in [ | |
| "хто тримає цей район?", | |
| "Напиши історію про Івасика-Телесика", | |
| "Яка найвища гора в Україні?", | |
| "Як звали батька Тараса Григоровича Шевченка?", | |
| "Яка з цих гір не знаходиться у Європі? Говерла, Монблан, Гран-Парадізо, Еверест", | |
| "Дай відповідь на питання\nЧому у качки жовті ноги?", | |
| ] | |
| ], | |
| ) | |
| image_input = gr.Image( | |
| label="Attach image (optional)", | |
| type="pil", | |
| sources=["upload", "clipboard"], | |
| height=200, | |
| interactive=True, | |
| elem_id="image-input", | |
| ) | |
| # ChatGPT-style input box with stop button | |
| with gr.Row(elem_id="chat-input-row"): | |
| msg = gr.Textbox( | |
| label=None, | |
| placeholder="Message… (Press Enter to send)", | |
| autofocus=True, | |
| lines=1, | |
| max_lines=6, | |
| container=False, | |
| show_label=False, | |
| elem_id="chat-input", | |
| elem_classes=["chat-input-box"] | |
| ) | |
| stop_btn_visible = gr.Button( | |
| "⏹️", | |
| variant="secondary", | |
| elem_id="stop-btn-visible", | |
| elem_classes=["stop-btn-chat"], | |
| visible=False, | |
| size="sm" | |
| ) | |
| # Hidden buttons for functionality | |
| with gr.Row(visible=True, elem_id="hidden-buttons"): | |
| send_btn = gr.Button("Send", variant="primary", elem_id="send-btn") | |
| stop_btn = gr.Button("Stop", variant="secondary", elem_id="stop-btn") | |
| clear_btn = gr.Button("Clear", variant="secondary", elem_id="clear-btn") | |
| # export_btn = gr.Button("Export chat (.md)", variant="secondary", elem_classes=["rounded-btn","secondary-btn"]) | |
| # exported_file = gr.File(label="", interactive=False, visible=True) | |
| gr.HTML('<div class="footer-tip">Shortcuts: Enter to send • Shift+Enter for new line</div>') | |
| # Helper functions for managing UI state | |
| def show_stop_button(): | |
| return gr.update(visible=True) | |
| def hide_stop_button(): | |
| return gr.update(visible=False) | |
| # Events (preserve your original handlers) | |
| e1 = msg.submit(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| e2 = send_btn.click(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| e3 = chatbot.example_select(fn=append_example_message, inputs=[chatbot], outputs=[chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| # Stop cancels running events (both buttons work) | |
| stop_btn.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True) | |
| stop_btn_visible.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True) | |
| # Clear chat + input | |
| clear_btn.click(fn=_clear_chat, inputs=None, outputs=[msg, image_input, chatbot]) | |
| # Export markdown | |
| # export_btn.click(fn=_export_markdown, inputs=chatbot, outputs=exported_file) | |
| # Load and inject external JavaScript | |
| def load_javascript(): | |
| try: | |
| with open("static/script.js", "r", encoding="utf-8") as f: | |
| return f"<script>{f.read()}</script>" | |
| except FileNotFoundError: | |
| print("Warning: static/script.js not found") | |
| return "" | |
| gr.HTML(load_javascript()) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |