|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import json |
|
import cv2 |
|
import math |
|
import random |
|
from typing import List |
|
import pycocotools.mask as maskUtils |
|
import imageio |
|
import numpy as np |
|
import torch |
|
from transformers import AutoModel, AutoTokenizer |
|
import torchvision.transforms as T |
|
from torch.utils.data import Dataset, DataLoader |
|
from decord import VideoReader, cpu |
|
from PIL import Image |
|
from torchvision.transforms.functional import InterpolationMode |
|
import torch.nn.functional as F |
|
from transformers import CLIPImageProcessor |
|
|
|
import argparse |
|
|
|
|
|
NUM_FRAMES = 8 |
|
MAX_FRAMES = 32 |
|
NUM_FRAMES_PER_SECOND = 1 |
|
|
|
IMAGENET_MEAN = (0.485, 0.456, 0.406) |
|
IMAGENET_STD = (0.229, 0.224, 0.225) |
|
|
|
|
|
def annToMask(mask_ann, h=None, w=None): |
|
if isinstance(mask_ann, list): |
|
rles = maskUtils.frPyObjects(mask_ann, h, w) |
|
rle = maskUtils.merge(rles) |
|
elif isinstance(mask_ann['counts'], list): |
|
|
|
rle = maskUtils.frPyObjects(mask_ann, h, w) |
|
else: |
|
|
|
rle = mask_ann |
|
mask = maskUtils.decode(rle) |
|
return mask |
|
|
|
def frame_sample(duration, mode='uniform', num_frames=None, fps=None): |
|
if mode == 'uniform': |
|
assert num_frames is not None, "Number of frames must be provided for uniform sampling." |
|
|
|
|
|
seg_size = float(duration - 1) / num_frames |
|
|
|
frame_ids = [] |
|
for i in range(num_frames): |
|
|
|
start = seg_size * i |
|
end = seg_size * (i + 1) |
|
|
|
frame_ids.append((start + end) / 2) |
|
|
|
return np.round(np.array(frame_ids) + 1e-6).astype(int) |
|
|
|
|
|
elif mode == 'fps': |
|
assert fps is not None, "FPS must be provided for FPS sampling." |
|
segment_len = min(fps // NUM_FRAMES_PER_SECOND, duration) |
|
return np.arange(segment_len // 2, duration, segment_len, dtype=int) |
|
|
|
|
|
|
|
def process_video(video_path, processor, s=None, e=None, aspect_ratio='pad', num_frames=NUM_FRAMES, frame_idx=None): |
|
if isinstance(video_path, str): |
|
if s is not None and e is not None: |
|
s = s if s >= 0. else 0. |
|
e = e if e >= 0. else 0. |
|
if s > e: |
|
s, e = e, s |
|
elif s == e: |
|
e = s + 1 |
|
|
|
|
|
if os.path.isdir(video_path): |
|
frame_files = sorted(os.listdir(video_path)) |
|
|
|
fps = 3 |
|
num_frames_of_video = len(frame_files) |
|
elif video_path.endswith('.gif'): |
|
gif_reader = imageio.get_reader(video_path) |
|
|
|
fps = 25 |
|
num_frames_of_video = len(gif_reader) |
|
else: |
|
vreader = VideoReader(video_path, ctx=cpu(0), num_threads=1) |
|
|
|
fps = vreader.get_avg_fps() |
|
num_frames_of_video = len(vreader) |
|
|
|
|
|
f_start = 0 if s is None else max(int(s * fps) - 1, 0) |
|
f_end = num_frames_of_video - 1 if e is None else min(int(e * fps) - 1, num_frames_of_video - 1) |
|
frame_indices = list(range(f_start, f_end + 1)) |
|
|
|
duration = len(frame_indices) |
|
|
|
if num_frames is None: |
|
sampled_frame_indices = [frame_indices[i] for i in frame_sample(duration, mode='fps', fps=fps)] |
|
else: |
|
sampled_frame_indices = [frame_indices[i] for i in frame_sample(duration, mode='uniform', num_frames=num_frames)] |
|
|
|
|
|
if os.path.isdir(video_path): |
|
video_data = [Image.open(os.path.join(video_path, frame_files[f_idx])) for f_idx in sampled_frame_indices] |
|
frame_data = [] |
|
if frame_idx is not None: |
|
for idx in frame_idx: |
|
frame = Image.open(os.path.join(video_path, frame_files[idx])).convert('RGB') |
|
frame_data.append(np.array(frame)) |
|
else: |
|
frame_data = None |
|
elif video_path.endswith('.gif'): |
|
video_data = [Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)) for idx, frame in enumerate(gif_reader) if idx in sampled_frame_indices] |
|
if frame_idx is not None: |
|
frame_data = [frame for index, frame in enumerate(gif_reader) if index in frame_idx] |
|
else: |
|
frame_data = None |
|
else: |
|
try: |
|
video_data = [Image.fromarray(frame) for frame in vreader.get_batch(sampled_frame_indices).asnumpy()] |
|
except: |
|
video_data = [Image.fromarray(frame) for frame in vreader.get_batch(sampled_frame_indices).numpy()] |
|
if frame_idx is not None: |
|
try: |
|
frame_data = vreader.get_batch(frame_idx).asnumpy() |
|
except: |
|
frame_data = vreader.get_batch(frame_idx).numpy() |
|
else: |
|
frame_data = None |
|
|
|
elif isinstance(video_path, np.ndarray): |
|
video_data = [Image.fromarray(f) for f in video_path] |
|
elif isinstance(video_path, list) and isinstance(video_path[0], np.ndarray): |
|
video_data = [Image.fromarray(f) for f in video_path] |
|
elif isinstance(video_path, list) and isinstance(video_path[0], str): |
|
video_data = [Image.open(f) for f in video_path] |
|
elif isinstance(video_path, list) and isinstance(video_path[0], Image.Image): |
|
video_data = video_path |
|
else: |
|
raise ValueError(f"Unsupported video path type: {type(video_path)}") |
|
|
|
while num_frames is not None and len(video_data) < num_frames: |
|
video_data.append(Image.fromarray(np.zeros((*video_data[-1].size, 3), dtype=np.uint8))) |
|
|
|
|
|
video_data = video_data[:MAX_FRAMES] |
|
|
|
height, width = np.array(video_data[0]).shape[:2] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return frame_data+video_data, height, width |
|
|
|
class VideoRefer_Bench_Q(Dataset): |
|
def __init__(self, video_folder, data_list, processor, mode): |
|
self.video_folder = video_folder |
|
self.data_list = data_list |
|
self.processor = processor |
|
self.mode = mode |
|
|
|
def __len__(self): |
|
return len(self.data_list) |
|
|
|
def __getitem__(self, idx): |
|
line = self.data_list[idx] |
|
video_path = os.path.join(self.video_folder, line['video']) |
|
|
|
line['Question'] = line['Question'].replace('<region>', '[<region>]') |
|
question = line['Question'] +' ' + ' '.join(line['options']) + '. Answer with the option\'s letter from the given choices directly.' |
|
video_name = line['video'] |
|
annotations = line['annotation'] |
|
|
|
if self.mode=='single': |
|
frame_idx = str(line['frame_idx']) |
|
annotations_single = [] |
|
for ann in annotations: |
|
annotations_single.append({frame_idx: ann[frame_idx]}) |
|
annotations = annotations_single |
|
|
|
ann_indices = [] |
|
all_frames = set() |
|
for ann in annotations: |
|
all_frames.update(list(ann.keys())) |
|
all_frames = list(all_frames) |
|
frame_nums = len(all_frames) |
|
for ann in annotations: |
|
frame_list = list(ann.keys()) |
|
indices = [] |
|
for frame in frame_list: |
|
indices.append(all_frames.index(frame)) |
|
ann_indices.append(indices) |
|
|
|
ann_indices=[ann_indices] |
|
frame_nums=[frame_nums] |
|
all_frames = [int(f) for f in all_frames] |
|
|
|
video_path = os.path.join(self.video_folder, video_name) |
|
|
|
|
|
video_pil_image_list, height, width = process_video(video_path, processor=self.processor, aspect_ratio='square', frame_idx=all_frames) |
|
|
|
masks = [] |
|
for anns in annotations: |
|
for ann_idx in anns.keys(): |
|
if anns[ann_idx]['segmentation'] is None: |
|
mask = np.zeros((height, width)) |
|
else: |
|
mask = annToMask(anns[ann_idx]['segmentation'], height, width) |
|
masks.append(mask) |
|
masks = np.array(masks) |
|
masks = torch.Tensor(masks) |
|
masks = masks.unsqueeze(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
'video_name': line['video'], |
|
'frames': video_pil_image_list, |
|
'masks': masks, |
|
'question': question, |
|
'ann_indices': ann_indices, |
|
'frame_nums': frame_nums, |
|
'answer': line['Answer'], |
|
'types': line['type'], |
|
} |
|
|
|
|
|
|
|
def split_list(lst, n): |
|
"""Split a list into n (roughly) equal-sized chunks""" |
|
chunk_size = math.ceil(len(lst) / n) |
|
return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)] |
|
|
|
def get_chunk(lst, n, k): |
|
chunks = split_list(lst, n) |
|
return chunks[k] |
|
|
|
def collate_fn(batch): |
|
vin = [x['video_name'] for x in batch] |
|
vid = [x['frames'] for x in batch] |
|
msk = [x['masks'] for x in batch] |
|
qs = [x['question'] for x in batch] |
|
aid = [x['ann_indices'] for x in batch] |
|
fn = [x['frame_nums'] for x in batch] |
|
ans = [x['answer'] for x in batch] |
|
tps = [x['types'] for x in batch] |
|
return vin, vid, msk, qs, aid, fn, ans, tps |
|
|
|
def build_videorefer_bench_q_eval(args, processor): |
|
|
|
questions = json.load(open(args.question_file)) |
|
questions = get_chunk(questions, args.num_chunks, args.chunk_idx) |
|
dataset = VideoRefer_Bench_Q(args.video_folder, questions, processor, args.mode) |
|
dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=False, num_workers=args.num_workers, collate_fn=collate_fn) |
|
return dataloader, dataset |
|
|
|
|
|
from distinctipy import distinctipy |
|
def contour_rendering(image, masks, mask_ids=None): |
|
colors = distinctipy.get_colors(len(masks)+1) |
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
text_thickness = 2 |
|
font_scale_list = [] |
|
label_list = [] |
|
color_list = [] |
|
label_loc_list = [] |
|
for anno_i in range(len(masks)): |
|
mask = masks[anno_i] |
|
contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
if colors[anno_i][0] > 0.9 and colors[anno_i][1] > 0.9 and colors[anno_i][2] > 0.9: |
|
color_anno_i = (colors[-1][2] * 255, colors[-1][1] * 255, colors[-1][0] * 255) |
|
else: |
|
color_anno_i = (colors[anno_i][2] * 255, colors[anno_i][1] * 255, colors[anno_i][0] * 255) |
|
|
|
cv2.drawContours(image, contours, -1, color=color_anno_i, thickness=2) |
|
|
|
cnt_area = [] |
|
cnt_centroid = [] |
|
cnt_bbox = [] |
|
for cnt in contours: |
|
cnt_area.append(cv2.contourArea(cnt)) |
|
M = cv2.moments(cnt) |
|
x, y, w, h = cv2.boundingRect(cnt) |
|
if M["m00"] > 0: |
|
cx = int(M["m10"] / M["m00"]) |
|
cy = int(M["m01"] / M["m00"]) |
|
else: |
|
cx, cy = x + w/2, y + h/2 |
|
cnt_centroid.append((cx, cy)) |
|
cnt_bbox.append((w, h)) |
|
select_cnt = 0 |
|
if len(cnt_area) > 1: |
|
select_cnt = np.argmax(np.array(cnt_area)) |
|
try: |
|
select_centroid = cnt_centroid[select_cnt] |
|
except: |
|
return False |
|
visual_prompt_id = anno_i+1 if mask_ids is None else mask_ids[anno_i] |
|
|
|
boxW, boxH = cnt_bbox[select_cnt] |
|
if max(boxH, boxW) < 25: |
|
thickness=1 |
|
else: |
|
thickness=text_thickness |
|
|
|
|
|
ok = False |
|
for scale in reversed(range(5, 60, 1)): |
|
textSize = cv2.getTextSize(f"{visual_prompt_id}", font, scale/10, thickness) |
|
textW, textH = textSize[0][0], textSize[0][1] |
|
if textH / boxH > 0.15 or textW / boxW > 0.15: |
|
continue |
|
font_scale_list.append(scale/10) |
|
ok = True |
|
break |
|
if not ok: |
|
font_scale_list.append(0.5) |
|
label_list.append(visual_prompt_id) |
|
color_list.append(color_anno_i) |
|
|
|
(base_w, base_h), bottom = cv2.getTextSize(f"{visual_prompt_id}", font, font_scale_list[-1], thickness) |
|
label_loc_list.append(( |
|
int(select_centroid[0] - base_w/2), |
|
int(select_centroid[1] + (base_h+bottom)/2) |
|
)) |
|
font_scale = min(font_scale_list) |
|
for anno_i in range(len(label_list)): |
|
(base_w, base_h), bottom = cv2.getTextSize(f"{label_list[anno_i]}", font, font_scale, thickness) |
|
cv2.rectangle(image, (label_loc_list[anno_i][0], int(label_loc_list[anno_i][1]-base_h-bottom/2)), |
|
(label_loc_list[anno_i][0]+base_w, int(label_loc_list[anno_i][1]+bottom/2)), |
|
color_list[anno_i], -1, 8) |
|
cv2.putText(image, f"{label_list[anno_i]}", label_loc_list[anno_i], font, font_scale, |
|
(255, 255, 255), thickness) |
|
|
|
return True |
|
|
|
|
|
def build_transform(input_size): |
|
MEAN, STD = IMAGENET_MEAN, IMAGENET_STD |
|
transform = T.Compose([ |
|
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), |
|
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC), |
|
T.ToTensor(), |
|
T.Normalize(mean=MEAN, std=STD) |
|
]) |
|
return transform |
|
|
|
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size): |
|
best_ratio_diff = float('inf') |
|
best_ratio = (1, 1) |
|
area = width * height |
|
for ratio in target_ratios: |
|
target_aspect_ratio = ratio[0] / ratio[1] |
|
ratio_diff = abs(aspect_ratio - target_aspect_ratio) |
|
if ratio_diff < best_ratio_diff: |
|
best_ratio_diff = ratio_diff |
|
best_ratio = ratio |
|
elif ratio_diff == best_ratio_diff: |
|
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: |
|
best_ratio = ratio |
|
return best_ratio |
|
|
|
def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False): |
|
orig_width, orig_height = image.size |
|
aspect_ratio = orig_width / orig_height |
|
|
|
|
|
target_ratios = set( |
|
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if |
|
i * j <= max_num and i * j >= min_num) |
|
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) |
|
|
|
|
|
target_aspect_ratio = find_closest_aspect_ratio( |
|
aspect_ratio, target_ratios, orig_width, orig_height, image_size) |
|
|
|
|
|
target_width = image_size * target_aspect_ratio[0] |
|
target_height = image_size * target_aspect_ratio[1] |
|
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] |
|
|
|
|
|
resized_img = image.resize((target_width, target_height)) |
|
processed_images = [] |
|
for i in range(blocks): |
|
box = ( |
|
(i % (target_width // image_size)) * image_size, |
|
(i // (target_width // image_size)) * image_size, |
|
((i % (target_width // image_size)) + 1) * image_size, |
|
((i // (target_width // image_size)) + 1) * image_size |
|
) |
|
|
|
split_img = resized_img.crop(box) |
|
processed_images.append(split_img) |
|
assert len(processed_images) == blocks |
|
if use_thumbnail and len(processed_images) != 1: |
|
thumbnail_img = image.resize((image_size, image_size)) |
|
processed_images.append(thumbnail_img) |
|
return processed_images |
|
|
|
def load_image(image, input_size=448, max_num=12): |
|
|
|
transform = build_transform(input_size=input_size) |
|
images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num) |
|
pixel_values = [transform(image) for image in images] |
|
pixel_values = torch.stack(pixel_values) |
|
return pixel_values |
|
|
|
|
|
def split_model(model_name): |
|
device_map = {} |
|
world_size = torch.cuda.device_count() |
|
num_layers = { |
|
'InternVL2-1B': 24, 'InternVL2-2B': 24, 'InternVL2-4B': 32, 'InternVL2-8B': 32, |
|
'InternVL2-26B': 48, 'InternVL2-40B': 60, 'InternVL2-Llama3-76B': 80}[model_name] |
|
|
|
num_layers_per_gpu = math.ceil(num_layers / (world_size - 0.5)) |
|
num_layers_per_gpu = [num_layers_per_gpu] * world_size |
|
num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.5) |
|
layer_cnt = 0 |
|
for i, num_layer in enumerate(num_layers_per_gpu): |
|
for j in range(num_layer): |
|
device_map[f'language_model.model.layers.{layer_cnt}'] = i |
|
layer_cnt += 1 |
|
device_map['vision_model'] = 0 |
|
device_map['mlp1'] = 0 |
|
device_map['language_model.model.tok_embeddings'] = 0 |
|
device_map['language_model.model.embed_tokens'] = 0 |
|
device_map['language_model.output'] = 0 |
|
device_map['language_model.model.norm'] = 0 |
|
device_map['language_model.lm_head'] = 0 |
|
device_map[f'language_model.model.layers.{num_layers - 1}'] = 0 |
|
|
|
return device_map |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(args): |
|
path = "./work_dirs/colva_internvl2_4b" |
|
model = AutoModel.from_pretrained( |
|
path, |
|
torch_dtype=torch.bfloat16, |
|
low_cpu_mem_usage=True, |
|
use_flash_attn=True, |
|
trust_remote_code=True).eval().cuda() |
|
tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False) |
|
|
|
generation_config = dict(max_new_tokens=1024, do_sample=True) |
|
|
|
answer_file = os.path.expanduser(args.output_file) |
|
os.makedirs(os.path.dirname(answer_file), exist_ok=True) |
|
ans_file = open(answer_file, "w") |
|
|
|
val_loader, val_dataset = build_videorefer_bench_q_eval(args, processor=None) |
|
for i in range(len(val_dataset)): |
|
ret_dict = val_dataset[i] |
|
|
|
video_name = ret_dict['video_name'] |
|
frame_list = ret_dict['frames'] |
|
masks = ret_dict['masks'] |
|
question = ret_dict['question'] |
|
ann_indices = ret_dict['ann_indices'] |
|
frame_nums = ret_dict['frame_nums'] |
|
answer = ret_dict['answer'] |
|
question_type = ret_dict['types'] |
|
|
|
|
|
overlied_image = cv2.cvtColor(np.asarray(frame_list[0]), cv2.COLOR_RGB2BGR) |
|
sub_question_list = question.split('[<region>]') |
|
assert len(sub_question_list)-1 == masks.shape[1] |
|
object_tags = [] |
|
for ii in range(masks.shape[1]): |
|
object_tags.append(sub_question_list[ii].split(' ')[-1]) |
|
assert 'object' in object_tags[-1], object_tags[-1] |
|
|
|
np_masks = masks[0].numpy().astype(np.uint8) |
|
is_ok = contour_rendering(overlied_image, np_masks, object_tags) |
|
if not is_ok: |
|
continue |
|
|
|
overlied_image = Image.fromarray(cv2.cvtColor(overlied_image, cv2.COLOR_BGR2RGB)) |
|
frame_list[0] = overlied_image |
|
|
|
|
|
|
|
|
|
|
|
all_pixel_values, num_patches_list = [], [] |
|
for image in frame_list: |
|
pixel_values = load_image(image, max_num=1).to(torch.bfloat16).cuda() |
|
all_pixel_values.append(pixel_values) |
|
num_patches_list.append(pixel_values.shape[0]) |
|
all_pixel_values = torch.cat(all_pixel_values, dim=0) |
|
|
|
video_prefix = ''.join([f'Frame{i+1}: <image>\n' for i in range(len(frame_list))]) |
|
question = video_prefix + question |
|
|
|
|
|
|
|
|
|
response = model.chat(tokenizer, all_pixel_values, question, generation_config, |
|
num_patches_list=num_patches_list, history=None) |
|
|
|
print("question: ", question) |
|
print("response: ", response) |
|
|
|
record = { |
|
'video': video_name, |
|
'Answer': answer, |
|
'pred': response, |
|
'type': question_type, |
|
} |
|
ans_file.write(json.dumps(record) + "\n") |
|
ans_file.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument('--video-folder', help='Directory containing video files.', required=True) |
|
parser.add_argument('--question-file', help='Path to the ground truth file containing question.', required=True) |
|
parser.add_argument('--output-file', help='Directory to save the model results JSON.', required=True) |
|
parser.add_argument("--batch-size", type=int, default=1) |
|
parser.add_argument("--num-workers", type=int, default=1) |
|
parser.add_argument("--num-chunks", type=int, default=1) |
|
parser.add_argument("--chunk-idx", type=int, default=0) |
|
parser.add_argument("--mode", type=str, default='single') |
|
args = parser.parse_args() |
|
|
|
main(args) |
|
|
|
|
|
|
|
|
|
|