Spaces:
Runtime error
Runtime error
File size: 6,475 Bytes
b3c78b4 4142d39 f0b1947 4142d39 b3c78b4 4142d39 f0b1947 4142d39 f0b1947 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 4142d39 b3c78b4 124292b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import gradio as gr
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
from PIL import Image
import cv2
import numpy as np
import os
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA version: {torch.version.cuda}")
print(f"Device count: {torch.cuda.device_count()}")
print(f"Current device: {torch.cuda.current_device()}")
print(f"Device name: {torch.cuda.get_device_name()}")
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# Set the default tensor type to cuda
if torch.cuda.is_available():
torch.set_default_tensor_type('torch.cuda.FloatTensor')
def load_model():
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model = Qwen2VLForConditionalGeneration.from_pretrained(
"Qwen/Qwen2-VL-2B-Instruct",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto",
low_cpu_mem_usage=True
)
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")
return model, processor, device
except Exception as e:
print(f"Error loading model: {e}")
return None, None, None
model, processor, device = load_model()
SYSTEM_PROMPT = """You are an expert technical analyst specializing in identifying bugs, fixing errors, and explaining code functions from visual inputs. When presented with an image or video:
1. If you see code, analyze it for potential bugs or errors, and suggest fixes.
2. If you see a function or algorithm, explain its purpose and how it works.
3. If you see a technical diagram or flowchart, interpret its meaning and purpose.
4. For any technical content, provide detailed explanations and insights.
Always maintain a professional and technical tone in your responses."""
def process_content(file, user_prompt):
if file is None:
return "No content provided. Please upload an image or video of technical content."
file_path = file.name
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension in ['.jpg', '.jpeg', '.png', '.bmp']:
image = Image.open(file_path)
return analyze_image(image, user_prompt)
elif file_extension in ['.mp4', '.avi', '.mov']:
return analyze_video(file_path, user_prompt)
else:
return "Unsupported file type. Please provide an image (jpg, jpeg, png, bmp) or video (mp4, avi, mov) of technical content."
def analyze_image(image, prompt):
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": f"Based on the system instructions, {prompt}"},
],
}
]
return generate_response(messages)
def analyze_video(video_path, prompt, max_frames=16, frame_interval=30, max_resolution=224):
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return "Error: Could not open video file."
frames = []
frame_count = 0
while len(frames) < max_frames:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
h, w = frame.shape[:2]
if h > w:
new_h, new_w = max_resolution, int(w * max_resolution / h)
else:
new_h, new_w = int(h * max_resolution / w), max_resolution
frame = cv2.resize(frame, (new_w, new_h))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = Image.fromarray(frame)
frames.append(frame)
frame_count += 1
return generate_response([
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "video", "video": frames},
{"type": "text", "text": f"Based on the system instructions, {prompt}"},
],
}
])
except Exception as e:
return f"Error processing video: {e}"
finally:
if 'cap' in locals():
cap.release()
def generate_response(messages):
try:
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt"
)
# Move inputs to GPU
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
generated_ids = model.generate(
**inputs,
max_new_tokens=512,
do_sample=True,
top_k=20,
top_p=0.9,
temperature=0.7
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed,
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)
# Clear CUDA cache
torch.cuda.empty_cache()
return output_text[0]
except Exception as e:
return f"Error generating response: {e}"
# Gradio interface
iface = gr.Interface(
fn=process_content,
inputs=[
gr.File(label="Upload Image or Video of Technical Content"),
gr.Textbox(label="Enter your technical question", placeholder="e.g., Identify any bugs in this code and suggest fixes", value="Analyze this technical content and provide insights.")
],
outputs="text",
title="Technical Content Analysis",
description="Upload an image or video of code, diagrams, or technical content. Ask questions about bugs, errors, or explanations of functions.",
)
iface.launch(share=True) |