VOIDER's picture
Update app.py
6eab0c4 verified
import gradio as gr
import os
import re
import json
import tempfile
import zipfile
import traceback
from huggingface_hub import hf_hub_download
import base64
from PIL import Image
from io import BytesIO
print("=" * 50)
print("Starting VisualQuality-R1 GGUF")
print("=" * 50)
# Константы
REPO_ID = "mradermacher/VisualQuality-R1-7B-GGUF"
MODEL_FILE = "VisualQuality-R1-7B.Q4_K_M.gguf"
MMPROJ_FILE = "VisualQuality-R1-7B.mmproj-Q8_0.gguf"
# Промпты
PROMPT = (
"You are doing the image quality assessment task. Here is the question: "
"What is your overall rating on the quality of this picture? The rating should be a float between 1 and 5, "
"rounded to two decimal places, with 1 representing very poor quality and 5 representing excellent quality."
)
QUESTION_TEMPLATE_THINKING = "{Question} First output the thinking process in <think> </think> tags and then output the final answer with only one score in <answer> </answer> tags."
QUESTION_TEMPLATE_NO_THINKING = "{Question} Please only output the final answer with only one score in <answer> </answer> tags."
# Глобальные переменные
llm = None
print("Importing llama_cpp...")
try:
from llama_cpp import Llama
import llama_cpp
print(f"llama_cpp version: {llama_cpp.__version__ if hasattr(llama_cpp, '__version__') else 'unknown'}")
except Exception as e:
print(f"Error importing llama_cpp: {e}")
traceback.print_exc()
# Пробуем импортировать chat handler для Qwen2-VL
chat_handler_class = None
chat_handler_name = None
try:
from llama_cpp.llama_chat_format import Qwen2VLChatHandler
chat_handler_class = Qwen2VLChatHandler
chat_handler_name = "Qwen2VLChatHandler"
print(f"✓ Found {chat_handler_name}")
except ImportError as e:
print(f"✗ Qwen2VLChatHandler not found: {e}")
# Список доступных chat handlers
if chat_handler_class is None:
print("\nListing available chat handlers...")
try:
from llama_cpp import llama_chat_format
handlers = [name for name in dir(llama_chat_format) if 'Handler' in name or 'Chat' in name]
print(f"Available handlers: {handlers}")
except Exception as e:
print(f"Could not list handlers: {e}")
def download_models():
"""Скачивание моделей"""
print(f"Downloading {MODEL_FILE}...")
model_path = hf_hub_download(
repo_id=REPO_ID,
filename=MODEL_FILE,
)
print(f"Model downloaded: {model_path}")
print(f"Downloading {MMPROJ_FILE}...")
mmproj_path = hf_hub_download(
repo_id=REPO_ID,
filename=MMPROJ_FILE,
)
print(f"MMProj downloaded: {mmproj_path}")
return model_path, mmproj_path
def load_model():
"""Загрузка модели"""
global llm, chat_handler_class, chat_handler_name
if llm is not None:
return True
if chat_handler_class is None:
print("ERROR: No suitable chat handler found for Qwen2-VL!")
print("Please ensure llama-cpp-python >= 0.3.2 is installed")
return False
try:
model_path, mmproj_path = download_models()
print(f"Creating {chat_handler_name}...")
chat_handler = chat_handler_class(
clip_model_path=mmproj_path,
verbose=True
)
print("Chat handler created")
print("Loading LLM...")
llm = Llama(
model_path=model_path,
chat_handler=chat_handler,
n_ctx=4096,
n_threads=4,
n_gpu_layers=0,
verbose=True,
)
print("Model loaded successfully!")
return True
except Exception as e:
print(f"Error loading model: {e}")
traceback.print_exc()
return False
def image_to_data_uri(image):
"""Конвертация PIL Image в data URI"""
if image is None:
return None
if image.mode != "RGB":
image = image.convert("RGB")
max_size = 768
if max(image.size) > max_size:
ratio = max_size / max(image.size)
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
image = image.resize(new_size, Image.LANCZOS)
buffered = BytesIO()
image.save(buffered, format="JPEG", quality=85)
img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
return f"data:image/jpeg;base64,{img_base64}"
def extract_score(text):
"""Извлечение оценки"""
try:
matches = re.findall(r'<answer>(.*?)</answer>', text, re.DOTALL)
if matches:
answer = matches[-1].strip()
else:
answer = text.strip()
score_match = re.search(r'\d+(\.\d+)?', answer)
if score_match:
score = float(score_match.group())
return min(max(score, 1.0), 5.0)
except:
pass
return None
def extract_thinking(text):
"""Извлечение мышления"""
matches = re.findall(r'<think>(.*?)</think>', text, re.DOTALL)
if matches:
return matches[-1].strip()
return ""
def score_single_image(image, use_thinking=True):
"""Оценка одного изображения"""
global llm
print(f"score_single_image called, use_thinking={use_thinking}")
if image is None:
return "❌ Upload an image first", "", ""
if not load_model():
return "❌ Failed to load model. Qwen2VLChatHandler not available. Check logs.", "", ""
template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING
prompt_text = template.format(Question=PROMPT)
print("Converting image...")
image_uri = image_to_data_uri(image)
print(f"Image converted, URI length: {len(image_uri)}")
messages = [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_uri}},
{"type": "text", "text": prompt_text}
]
}
]
print("Starting generation...")
generated_text = ""
try:
response = llm.create_chat_completion(
messages=messages,
max_tokens=2048 if use_thinking else 256,
temperature=0.7,
top_p=0.95,
stream=True,
)
for chunk in response:
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
generated_text += content
thinking = extract_thinking(generated_text)
score = extract_score(generated_text)
score_display = f"⭐ **Score: {score:.2f} / 5.00**" if score else "*Analyzing...*"
yield generated_text, thinking, score_display
print(f"Generation complete, length: {len(generated_text)}")
final_score = extract_score(generated_text)
final_thinking = extract_thinking(generated_text) if use_thinking else ""
if final_score is not None:
score_display = f"⭐ **Quality Score: {final_score:.2f} / 5.00**\n\n📊 **For Leaderboard:** `{final_score:.2f}`"
else:
score_display = "❌ Could not extract score"
yield generated_text, final_thinking, score_display
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
print(error_msg)
traceback.print_exc()
yield error_msg, "", ""
def process_batch(files, use_thinking=True, progress=gr.Progress()):
"""Batch processing"""
global llm
print(f"process_batch: {len(files) if files else 0} files")
if not files:
return "❌ No files", None
if not load_model():
return "❌ Failed to load model", None
results = []
template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING
prompt_text = template.format(Question=PROMPT)
for i, file in enumerate(files):
filename = "unknown"
try:
if hasattr(file, 'name'):
image = Image.open(file.name)
filename = os.path.basename(file.name)
else:
image = Image.open(file)
filename = f"image_{i+1}.jpg"
print(f"Processing {i+1}/{len(files)}: {filename}")
image_uri = image_to_data_uri(image)
messages = [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_uri}},
{"type": "text", "text": prompt_text}
]
}
]
response = llm.create_chat_completion(
messages=messages,
max_tokens=2048 if use_thinking else 256,
temperature=0.7,
top_p=0.95,
)
generated_text = response["choices"][0]["message"]["content"]
score = extract_score(generated_text)
thinking = extract_thinking(generated_text) if use_thinking else ""
results.append({
"filename": filename,
"score": score if score else "N/A",
"thinking": thinking,
"raw_output": generated_text
})
print(f" Score: {score}")
progress((i + 1) / len(files), desc=f"{i+1}/{len(files)}: {filename}")
except Exception as e:
print(f" Error: {e}")
results.append({
"filename": filename,
"score": "ERROR",
"thinking": "",
"raw_output": str(e)
})
# Create files
try:
with tempfile.TemporaryDirectory() as tmpdir:
txt_file = os.path.join(tmpdir, "leaderboard_scores.txt")
with open(txt_file, "w") as f:
for r in results:
s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score'])
f.write(f"{r['filename']}\t{s}\n")
json_file = os.path.join(tmpdir, "results.json")
with open(json_file, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
csv_file = os.path.join(tmpdir, "scores.csv")
with open(csv_file, "w") as f:
f.write("filename,score\n")
for r in results:
s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score'])
f.write(f"{r['filename']},{s}\n")
zip_path = os.path.join(tmpdir, "results.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(txt_file, "leaderboard_scores.txt")
zipf.write(json_file, "results.json")
zipf.write(csv_file, "scores.csv")
final_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
with open(zip_path, 'rb') as f:
final_zip.write(f.read())
final_zip.close()
except Exception as e:
return f"❌ Error saving: {e}", None
valid_scores = [r['score'] for r in results if isinstance(r['score'], float)]
avg = sum(valid_scores) / len(valid_scores) if valid_scores else 0
summary = f"""## ✅ Done!
**Processed:** {len(results)} | **OK:** {len(valid_scores)} | **Failed:** {len(results) - len(valid_scores)}
**Avg:** {avg:.2f} | **Min:** {min(valid_scores):.2f if valid_scores else 'N/A'} | **Max:** {max(valid_scores):.2f if valid_scores else 'N/A'}
| File | Score |
|------|-------|
""" + "\n".join([f"| {r['filename'][:40]} | {r['score']:.2f if isinstance(r['score'], float) else r['score']} |" for r in results[:10]])
return summary, final_zip.name
# Interface
print("Creating interface...")
with gr.Blocks(title="VisualQuality-R1") as demo:
gr.Markdown("""
# 🎨 VisualQuality-R1 (GGUF/CPU)
**Image Quality Assessment** | ~30-60 sec/image on CPU
[![Paper](https://img.shields.io/badge/arXiv-2505.14460-red)](https://arxiv.org/abs/2505.14460)
""")
with gr.Tabs():
with gr.TabItem("📷 Single"):
with gr.Row():
with gr.Column():
img = gr.Image(label="Image", type="pil", height=350)
think = gr.Checkbox(label="🧠 Thinking", value=True)
btn = gr.Button("🔍 Analyze", variant="primary", size="lg")
with gr.Column():
score = gr.Markdown("*Upload image*")
thinking = gr.Textbox(label="Thinking", lines=6)
output = gr.Textbox(label="Output", lines=8)
btn.click(score_single_image, [img, think], [output, thinking, score])
with gr.TabItem("📁 Batch"):
with gr.Row():
with gr.Column():
files = gr.File(label="Images", file_count="multiple", file_types=["image"])
batch_think = gr.Checkbox(label="🧠 Thinking", value=False)
batch_btn = gr.Button("🚀 Process", variant="primary", size="lg")
with gr.Column():
summary = gr.Markdown("*Upload & Process*")
download = gr.File(label="📥 Results")
batch_btn.click(process_batch, [files, batch_think], [summary, download])
print("Starting server...")
if __name__ == "__main__":
demo.queue(max_size=5)
demo.launch(server_name="0.0.0.0", server_port=7860)