Spaces:
Sleeping
Sleeping
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor | |
import torch | |
import numpy as np | |
import av | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf' | |
processor = LlavaNextVideoProcessor.from_pretrained(model_name) | |
model = LlavaNextVideoForConditionalGeneration.from_pretrained( | |
model_name, | |
quantization_config=quantization_config, | |
device_map='auto' | |
) | |
def read_video_pyav(container, indices): | |
''' | |
Decode the video with PyAV decoder. | |
Args: | |
container (av.container.input.InputContainer): PyAV container. | |
indices (List[int]): List of frame indices to decode. | |
Returns: | |
np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). | |
''' | |
frames = [] | |
container.seek(0) | |
start_index = indices[0] | |
end_index = indices[-1] | |
for i, frame in enumerate(container.decode(video=0)): | |
if i > end_index: | |
break | |
if i >= start_index and i in indices: | |
frames.append(frame) | |
return np.stack([x.to_ndarray(format="rgb24") for x in frames]) | |
def process_video(video_file, question): | |
# Open video and sample frames | |
with av.open(video_file) as container: | |
total_frames = container.streams.video[0].frames | |
indices = np.arange(0, total_frames, total_frames / 8).astype(int) | |
video_clip = read_video_pyav(container, indices) | |
# Prepare conversation | |
conversation = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": f"{question}"}, | |
{"type": "video"}, | |
], | |
}, | |
] | |
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) | |
# Prepare inputs for the model | |
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device) | |
# Generate output | |
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} | |
output = model.generate(**input, **generate_kwargs) | |
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0] | |
return generated_text.split("ASSISTANT: ", 1)[-1].strip() | |
# Define Gradio interface | |
def gradio_interface(video, question): | |
return process_video(video, question) | |
iface = gr.Interface( | |
fn=gradio_interface, | |
inputs=[ | |
gr.Video(label="Upload Video"), | |
gr.Textbox(label="Enter Question") | |
], | |
outputs=gr.Textbox(label="Generated Answer"), | |
title="Video Question Answering", | |
description="Upload a video and enter a question to get a generated text response." | |
) | |
if __name__ == "__main__": | |
iface.launch(debug=True) | |