Spaces:
Build error
Build error
| import gradio as gr | |
| import os | |
| import re | |
| import json | |
| import tempfile | |
| import zipfile | |
| import traceback | |
| from huggingface_hub import hf_hub_download | |
| import base64 | |
| from PIL import Image | |
| from io import BytesIO | |
| print("=" * 50) | |
| print("Starting VisualQuality-R1 GGUF") | |
| print("=" * 50) | |
| # Константы | |
| REPO_ID = "mradermacher/VisualQuality-R1-7B-GGUF" | |
| MODEL_FILE = "VisualQuality-R1-7B.Q4_K_M.gguf" | |
| MMPROJ_FILE = "VisualQuality-R1-7B.mmproj-Q8_0.gguf" | |
| # Промпты | |
| PROMPT = ( | |
| "You are doing the image quality assessment task. Here is the question: " | |
| "What is your overall rating on the quality of this picture? The rating should be a float between 1 and 5, " | |
| "rounded to two decimal places, with 1 representing very poor quality and 5 representing excellent quality." | |
| ) | |
| QUESTION_TEMPLATE_THINKING = "{Question} First output the thinking process in <think> </think> tags and then output the final answer with only one score in <answer> </answer> tags." | |
| QUESTION_TEMPLATE_NO_THINKING = "{Question} Please only output the final answer with only one score in <answer> </answer> tags." | |
| # Глобальные переменные | |
| llm = None | |
| print("Importing llama_cpp...") | |
| try: | |
| from llama_cpp import Llama | |
| import llama_cpp | |
| print(f"llama_cpp version: {llama_cpp.__version__ if hasattr(llama_cpp, '__version__') else 'unknown'}") | |
| except Exception as e: | |
| print(f"Error importing llama_cpp: {e}") | |
| traceback.print_exc() | |
| # Пробуем импортировать chat handler для Qwen2-VL | |
| chat_handler_class = None | |
| chat_handler_name = None | |
| try: | |
| from llama_cpp.llama_chat_format import Qwen2VLChatHandler | |
| chat_handler_class = Qwen2VLChatHandler | |
| chat_handler_name = "Qwen2VLChatHandler" | |
| print(f"✓ Found {chat_handler_name}") | |
| except ImportError as e: | |
| print(f"✗ Qwen2VLChatHandler not found: {e}") | |
| # Список доступных chat handlers | |
| if chat_handler_class is None: | |
| print("\nListing available chat handlers...") | |
| try: | |
| from llama_cpp import llama_chat_format | |
| handlers = [name for name in dir(llama_chat_format) if 'Handler' in name or 'Chat' in name] | |
| print(f"Available handlers: {handlers}") | |
| except Exception as e: | |
| print(f"Could not list handlers: {e}") | |
| def download_models(): | |
| """Скачивание моделей""" | |
| print(f"Downloading {MODEL_FILE}...") | |
| model_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=MODEL_FILE, | |
| ) | |
| print(f"Model downloaded: {model_path}") | |
| print(f"Downloading {MMPROJ_FILE}...") | |
| mmproj_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=MMPROJ_FILE, | |
| ) | |
| print(f"MMProj downloaded: {mmproj_path}") | |
| return model_path, mmproj_path | |
| def load_model(): | |
| """Загрузка модели""" | |
| global llm, chat_handler_class, chat_handler_name | |
| if llm is not None: | |
| return True | |
| if chat_handler_class is None: | |
| print("ERROR: No suitable chat handler found for Qwen2-VL!") | |
| print("Please ensure llama-cpp-python >= 0.3.2 is installed") | |
| return False | |
| try: | |
| model_path, mmproj_path = download_models() | |
| print(f"Creating {chat_handler_name}...") | |
| chat_handler = chat_handler_class( | |
| clip_model_path=mmproj_path, | |
| verbose=True | |
| ) | |
| print("Chat handler created") | |
| print("Loading LLM...") | |
| llm = Llama( | |
| model_path=model_path, | |
| chat_handler=chat_handler, | |
| n_ctx=4096, | |
| n_threads=4, | |
| n_gpu_layers=0, | |
| verbose=True, | |
| ) | |
| print("Model loaded successfully!") | |
| return True | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| traceback.print_exc() | |
| return False | |
| def image_to_data_uri(image): | |
| """Конвертация PIL Image в data URI""" | |
| if image is None: | |
| return None | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| max_size = 768 | |
| if max(image.size) > max_size: | |
| ratio = max_size / max(image.size) | |
| new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) | |
| image = image.resize(new_size, Image.LANCZOS) | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG", quality=85) | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| return f"data:image/jpeg;base64,{img_base64}" | |
| def extract_score(text): | |
| """Извлечение оценки""" | |
| try: | |
| matches = re.findall(r'<answer>(.*?)</answer>', text, re.DOTALL) | |
| if matches: | |
| answer = matches[-1].strip() | |
| else: | |
| answer = text.strip() | |
| score_match = re.search(r'\d+(\.\d+)?', answer) | |
| if score_match: | |
| score = float(score_match.group()) | |
| return min(max(score, 1.0), 5.0) | |
| except: | |
| pass | |
| return None | |
| def extract_thinking(text): | |
| """Извлечение мышления""" | |
| matches = re.findall(r'<think>(.*?)</think>', text, re.DOTALL) | |
| if matches: | |
| return matches[-1].strip() | |
| return "" | |
| def score_single_image(image, use_thinking=True): | |
| """Оценка одного изображения""" | |
| global llm | |
| print(f"score_single_image called, use_thinking={use_thinking}") | |
| if image is None: | |
| return "❌ Upload an image first", "", "" | |
| if not load_model(): | |
| return "❌ Failed to load model. Qwen2VLChatHandler not available. Check logs.", "", "" | |
| template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING | |
| prompt_text = template.format(Question=PROMPT) | |
| print("Converting image...") | |
| image_uri = image_to_data_uri(image) | |
| print(f"Image converted, URI length: {len(image_uri)}") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": image_uri}}, | |
| {"type": "text", "text": prompt_text} | |
| ] | |
| } | |
| ] | |
| print("Starting generation...") | |
| generated_text = "" | |
| try: | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=2048 if use_thinking else 256, | |
| temperature=0.7, | |
| top_p=0.95, | |
| stream=True, | |
| ) | |
| for chunk in response: | |
| delta = chunk.get("choices", [{}])[0].get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| generated_text += content | |
| thinking = extract_thinking(generated_text) | |
| score = extract_score(generated_text) | |
| score_display = f"⭐ **Score: {score:.2f} / 5.00**" if score else "*Analyzing...*" | |
| yield generated_text, thinking, score_display | |
| print(f"Generation complete, length: {len(generated_text)}") | |
| final_score = extract_score(generated_text) | |
| final_thinking = extract_thinking(generated_text) if use_thinking else "" | |
| if final_score is not None: | |
| score_display = f"⭐ **Quality Score: {final_score:.2f} / 5.00**\n\n📊 **For Leaderboard:** `{final_score:.2f}`" | |
| else: | |
| score_display = "❌ Could not extract score" | |
| yield generated_text, final_thinking, score_display | |
| except Exception as e: | |
| error_msg = f"❌ Error: {str(e)}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| yield error_msg, "", "" | |
| def process_batch(files, use_thinking=True, progress=gr.Progress()): | |
| """Batch processing""" | |
| global llm | |
| print(f"process_batch: {len(files) if files else 0} files") | |
| if not files: | |
| return "❌ No files", None | |
| if not load_model(): | |
| return "❌ Failed to load model", None | |
| results = [] | |
| template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING | |
| prompt_text = template.format(Question=PROMPT) | |
| for i, file in enumerate(files): | |
| filename = "unknown" | |
| try: | |
| if hasattr(file, 'name'): | |
| image = Image.open(file.name) | |
| filename = os.path.basename(file.name) | |
| else: | |
| image = Image.open(file) | |
| filename = f"image_{i+1}.jpg" | |
| print(f"Processing {i+1}/{len(files)}: {filename}") | |
| image_uri = image_to_data_uri(image) | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": image_uri}}, | |
| {"type": "text", "text": prompt_text} | |
| ] | |
| } | |
| ] | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=2048 if use_thinking else 256, | |
| temperature=0.7, | |
| top_p=0.95, | |
| ) | |
| generated_text = response["choices"][0]["message"]["content"] | |
| score = extract_score(generated_text) | |
| thinking = extract_thinking(generated_text) if use_thinking else "" | |
| results.append({ | |
| "filename": filename, | |
| "score": score if score else "N/A", | |
| "thinking": thinking, | |
| "raw_output": generated_text | |
| }) | |
| print(f" Score: {score}") | |
| progress((i + 1) / len(files), desc=f"{i+1}/{len(files)}: {filename}") | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| results.append({ | |
| "filename": filename, | |
| "score": "ERROR", | |
| "thinking": "", | |
| "raw_output": str(e) | |
| }) | |
| # Create files | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| txt_file = os.path.join(tmpdir, "leaderboard_scores.txt") | |
| with open(txt_file, "w") as f: | |
| for r in results: | |
| s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) | |
| f.write(f"{r['filename']}\t{s}\n") | |
| json_file = os.path.join(tmpdir, "results.json") | |
| with open(json_file, "w") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| csv_file = os.path.join(tmpdir, "scores.csv") | |
| with open(csv_file, "w") as f: | |
| f.write("filename,score\n") | |
| for r in results: | |
| s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) | |
| f.write(f"{r['filename']},{s}\n") | |
| zip_path = os.path.join(tmpdir, "results.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(txt_file, "leaderboard_scores.txt") | |
| zipf.write(json_file, "results.json") | |
| zipf.write(csv_file, "scores.csv") | |
| final_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") | |
| with open(zip_path, 'rb') as f: | |
| final_zip.write(f.read()) | |
| final_zip.close() | |
| except Exception as e: | |
| return f"❌ Error saving: {e}", None | |
| valid_scores = [r['score'] for r in results if isinstance(r['score'], float)] | |
| avg = sum(valid_scores) / len(valid_scores) if valid_scores else 0 | |
| summary = f"""## ✅ Done! | |
| **Processed:** {len(results)} | **OK:** {len(valid_scores)} | **Failed:** {len(results) - len(valid_scores)} | |
| **Avg:** {avg:.2f} | **Min:** {min(valid_scores):.2f if valid_scores else 'N/A'} | **Max:** {max(valid_scores):.2f if valid_scores else 'N/A'} | |
| | File | Score | | |
| |------|-------| | |
| """ + "\n".join([f"| {r['filename'][:40]} | {r['score']:.2f if isinstance(r['score'], float) else r['score']} |" for r in results[:10]]) | |
| return summary, final_zip.name | |
| # Interface | |
| print("Creating interface...") | |
| with gr.Blocks(title="VisualQuality-R1") as demo: | |
| gr.Markdown(""" | |
| # 🎨 VisualQuality-R1 (GGUF/CPU) | |
| **Image Quality Assessment** | ~30-60 sec/image on CPU | |
| [](https://arxiv.org/abs/2505.14460) | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("📷 Single"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| img = gr.Image(label="Image", type="pil", height=350) | |
| think = gr.Checkbox(label="🧠 Thinking", value=True) | |
| btn = gr.Button("🔍 Analyze", variant="primary", size="lg") | |
| with gr.Column(): | |
| score = gr.Markdown("*Upload image*") | |
| thinking = gr.Textbox(label="Thinking", lines=6) | |
| output = gr.Textbox(label="Output", lines=8) | |
| btn.click(score_single_image, [img, think], [output, thinking, score]) | |
| with gr.TabItem("📁 Batch"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| files = gr.File(label="Images", file_count="multiple", file_types=["image"]) | |
| batch_think = gr.Checkbox(label="🧠 Thinking", value=False) | |
| batch_btn = gr.Button("🚀 Process", variant="primary", size="lg") | |
| with gr.Column(): | |
| summary = gr.Markdown("*Upload & Process*") | |
| download = gr.File(label="📥 Results") | |
| batch_btn.click(process_batch, [files, batch_think], [summary, download]) | |
| print("Starting server...") | |
| if __name__ == "__main__": | |
| demo.queue(max_size=5) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |