AR_Testing / app.py
XiaoyiYangRIT
Update some files
a6cd9f8
raw
history blame
3.76 kB
import gradio as gr
import torch
import math
import os
from transformers import AutoTokenizer, AutoModel, AutoProcessor
from huggingface_hub import snapshot_download
from decord import VideoReader, cpu
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
# === 视觉预处理 ===
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)
transform = Compose([
Resize((448, 448)),
ToTensor(),
Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
])
# === 模型加载 ===
PERSISTENT_DIR = "/data/internvl3_model" # 持久路径
MODEL_NAME = "OpenGVLab/InternVL3-14B"
# 如果第一次运行:下载模型并缓存到 /data
if not os.path.exists(PERSISTENT_DIR):
print("⏬ First run: downloading model to persistent storage...")
snapshot_download(repo_id=MODEL_NAME, local_dir=PERSISTENT_DIR, trust_remote_code=True)
else:
print("✅ Loaded model from persistent cache.")
# 模型加载(从本地)
tokenizer = AutoTokenizer.from_pretrained(PERSISTENT_DIR, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(PERSISTENT_DIR, trust_remote_code=True)
def split_model(model_path):
from transformers import AutoConfig
device_map = {}
world_size = torch.cuda.device_count()
config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
num_layers = config.llm_config.num_hidden_layers
num_layers_per_gpu = math.ceil(num_layers / (world_size - 0.5))
num_layers_per_gpu = [num_layers_per_gpu] * world_size
num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.5)
layer_cnt = 0
for i, num_layer in enumerate(num_layers_per_gpu):
for _ in range(num_layer):
device_map[f'language_model.model.layers.{layer_cnt}'] = i
layer_cnt += 1
device_map['vision_model'] = 0
device_map['mlp1'] = 0
device_map['language_model.model.tok_embeddings'] = 0
device_map['language_model.model.embed_tokens'] = 0
device_map['language_model.output'] = 0
device_map['language_model.model.norm'] = 0
device_map['language_model.model.rotary_emb'] = 0
device_map['language_model.lm_head'] = 0
device_map[f'language_model.model.layers.{num_layers - 1}'] = 0
return device_map
device_map = split_model(PERSISTENT_DIR)
model = AutoModel.from_pretrained(
PERSISTENT_DIR,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
use_flash_attn=True,
trust_remote_code=True,
device_map=device_map
).eval()
# === 视频帧采样 ===
def extract_frames(video_path, num_frames=8):
vr = VideoReader(video_path, ctx=cpu(0))
total_frames = len(vr)
frame_indices = list(torch.linspace(0, total_frames - 1, num_frames).int().tolist())
images = []
for idx in frame_indices:
img = Image.fromarray(vr[idx].asnumpy()).convert("RGB")
img_tensor = transform(img)
images.append(img_tensor)
return torch.stack(images)
# === 推理函数 ===
def evaluate_ar(video):
frames = extract_frames(video.name).to(torch.bfloat16).cuda()
prompt = "Evaluate the quality of AR occlusion and rendering in the uploaded video." # 可换成具体任务
num_patches = [1] * frames.shape[0]
output, _ = model.chat(
tokenizer,
frames,
prompt,
generation_config=dict(max_new_tokens=512),
num_patches_list=num_patches,
history=None,
return_history=True
)
return output
# === Gradio 界面 ===
gr.Interface(
fn=evaluate_ar,
inputs=gr.Video(label="Upload your AR video"),
outputs="text",
title="InternVL3 AR Evaluation (Single-turn)",
description="Upload a video clip. The model will analyze AR occlusion and rendering quality."
).launch()