Spaces:
Runtime error
Runtime error
import gradio as gr | |
import argparse | |
import shutil | |
import os | |
from video_keyframe_detector.cli import keyframeDetection | |
import numpy as np | |
import cv2 | |
from llava.constants import ( | |
IMAGE_TOKEN_INDEX, | |
DEFAULT_IMAGE_TOKEN, | |
DEFAULT_IM_START_TOKEN, | |
DEFAULT_IM_END_TOKEN, | |
IMAGE_PLACEHOLDER, | |
) | |
from PIL import Image | |
from llava.conversation import conv_templates, SeparatorStyle | |
from llava.model.builder import load_pretrained_model | |
from llava.utils import disable_torch_init | |
from llava.mm_utils import ( | |
process_images, | |
tokenizer_image_token, | |
get_model_name_from_path, | |
KeywordsStoppingCriteria, | |
) | |
import torch | |
def extract_keyframes(video_path, num_keyframes=12): | |
video_id = video_path.split('/')[-1].strip().split('.')[0] | |
os.makedirs("temp", exist_ok=True) | |
keyframeDetection(video_path, "temp", 0.6) | |
video_frame_list = sorted(os.listdir(os.path.join("temp", "keyFrames")), key=lambda x: int(x.split('.')[0][8:])) | |
os.makedirs(os.path.join("video_frames", video_id), exist_ok=True) | |
selected_frame_idx_set = set(np.linspace(1, len(video_frame_list) - 1, num_keyframes).astype(int)) | |
cnt = 0 | |
for i in range(len(video_frame_list)): | |
if i in selected_frame_idx_set: | |
source_file = os.path.join("temp", "keyFrames", video_frame_list[i]) | |
target_file = os.path.join("video_frames", video_id, f"frame_{cnt}.jpg") | |
shutil.copyfile(source_file, target_file) | |
cnt += 1 | |
shutil.rmtree("temp", ignore_errors=True) | |
def concatenate_frames(video_path): | |
os.makedirs("concatenated_frames", exist_ok=True) | |
video_id = video_path.split('/')[-1].strip().split('.')[0] | |
image_frame_dir = os.path.join("video_frames", video_id) | |
image_frame_list = sorted(os.listdir(os.path.join(image_frame_dir)), key=lambda x: int(x.split('.')[0].split('_')[1])) | |
img_list = [] | |
for image_frame in image_frame_list: | |
img_frame = cv2.imread(os.path.join(image_frame_dir, image_frame)) | |
img_list.append(img_frame) | |
img_row1 = cv2.hconcat(img_list[:4]) | |
img_row2 = cv2.hconcat(img_list[4:8]) | |
img_row3 = cv2.hconcat(img_list[8:12]) | |
img_v = cv2.vconcat([img_row1, img_row2, img_row3]) | |
cv2.imwrite(os.path.join("concatenated_frames", f"{video_id}.jpg"), img_v) | |
def image_parser(args): | |
out = args.image_file.split(args.sep) | |
return out | |
def load_image(image_file): | |
if image_file.startswith("http") or image_file.startswith("https"): | |
response = requests.get(image_file) | |
image = Image.open(BytesIO(response.content)).convert("RGB") | |
else: | |
image = Image.open(image_file).convert("RGB") | |
return image | |
def load_images(image_files): | |
out = [] | |
for image_file in image_files: | |
image = load_image(image_file) | |
out.append(image) | |
return out | |
def eval_model(args, model_name, tokenizer, model, image_processor, context_len): | |
# Model | |
DEFAULT_IMAGE_TOKEN = "<image>" | |
disable_torch_init() | |
qs = args.query | |
image_token_se = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN | |
if model.config.mm_use_im_start_end: | |
qs = image_token_se + "\n" + qs | |
else: | |
qs = DEFAULT_IMAGE_TOKEN + "\n" + qs | |
if "llama-2" in model_name.lower(): | |
conv_mode = "llava_llama_2" | |
elif "v1" in model_name.lower(): | |
conv_mode = "llava_v1" | |
elif "mpt" in model_name.lower(): | |
conv_mode = "mpt" | |
else: | |
conv_mode = "llava_v0" | |
if args.conv_mode is not None and conv_mode != args.conv_mode: | |
print( | |
"[WARNING] the auto inferred conversation mode is {}, while `--conv-mode` is {}, using {}".format( | |
conv_mode, args.conv_mode, args.conv_mode | |
) | |
) | |
else: | |
args.conv_mode = conv_mode | |
conv = conv_templates[args.conv_mode].copy() | |
conv.append_message(conv.roles[0], qs) | |
conv.append_message(conv.roles[1], None) | |
prompt = conv.get_prompt() | |
image_files = image_parser(args) | |
images = load_images(image_files) | |
images_tensor = process_images( | |
images, | |
image_processor, | |
model.config | |
).to(model.device, dtype=torch.float16) | |
input_ids = ( | |
tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt") | |
.unsqueeze(0) | |
.cuda() | |
) | |
stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2 | |
keywords = [stop_str] | |
stopping_criteria = KeywordsStoppingCriteria(keywords, tokenizer, input_ids) | |
with torch.inference_mode(): | |
output_ids = model.generate( | |
input_ids, | |
images=images_tensor, | |
do_sample=True, | |
temperature=0.2, | |
max_new_tokens=1024, | |
use_cache=True, | |
stopping_criteria=[stopping_criteria], | |
) | |
input_token_len = input_ids.shape[1] | |
n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item() | |
if n_diff_input_output > 0: | |
print( | |
f"[Warning] {n_diff_input_output} output_ids are not the same as the input_ids" | |
) | |
outputs = tokenizer.batch_decode( | |
output_ids[:, input_token_len:], skip_special_tokens=True | |
)[0] | |
outputs = outputs.strip() | |
if outputs.endswith(stop_str): | |
outputs = outputs[: -len(stop_str)] | |
outputs = outputs.strip() | |
return outputs | |
def generate_video_caption(video_path): | |
model_path = "liuhaotian/llava-v1.5-7b" | |
model_name = get_model_name_from_path(model_path) | |
tokenizer, model, image_processor, context_len = load_pretrained_model(model_path, None, model_name) | |
video_id = video_path.split('/')[-1].strip().split('.')[0] | |
image_file = os.path.join("concatenated_frames", f"{video_id}.jpg") | |
prompt = "In a short paragraph, describe the process in the video." | |
args = type('Args', (), { | |
"model_path": model_path, | |
"model_base": None, | |
"model_name": get_model_name_from_path(model_path), | |
"query": prompt, | |
"conv_mode": None, | |
"image_file": image_file, | |
"sep": ",", | |
"max_new_tokens": 1024, | |
"temperature": 0.2 | |
})() | |
video_caption = eval_model(args, model_name, tokenizer, model, image_processor, context_len).replace("images", "video").replace("image", "video") | |
return video_caption | |
def clean_files_and_folders(): | |
shutil.rmtree("concatenated_frames") | |
shutil.rmtree("video_frames") | |
def video_to_text(video_file): | |
video_path = video_file.name | |
extract_keyframes(video_path) | |
concatenate_frames(video_path) | |
video_caption = generate_video_caption(video_path) | |
clean_files_and_folders() | |
return video_caption | |
iface = gr.Interface( | |
fn=video_to_text, | |
inputs=gr.File(file_types=["video"]), | |
outputs="text", | |
title="Video to Text Transcription", | |
description="Upload a video and get the transcribed text" | |
) | |
iface.launch() |