Spaces:
Sleeping
Sleeping
File size: 5,218 Bytes
ba1eb4b b54618b ba1eb4b b54618b ba1eb4b 9178374 402c2c1 9178374 402c2c1 9178374 402c2c1 ba1eb4b 9178374 ba1eb4b 9178374 ba1eb4b eba62a3 b54618b ba1eb4b 9178374 ba1eb4b 9178374 ba1eb4b eba62a3 ba1eb4b 9178374 b54618b a21637d df2ba9f a21637d 9178374 df2ba9f a21637d 9178374 df2ba9f a21637d df2ba9f ba1eb4b df2ba9f b54618b ba1eb4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch
import numpy as np
import av
import gc
import spaces
import gradio as gr
import os
import json
import csv
import io
# Model Configuration
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
model_name = 'llava-hf/LLaVA-NeXT-Video-7B-DPO-hf'
# Load Model and Processor
processor = LlavaNextVideoProcessor.from_pretrained(model_name)
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
model_name,
quantization_config=quantization_config,
device_map='auto'
)
def read_video_pyav(container, indices):
'''
Decode the video with PyAV decoder.
'''
frames = []
container.seek(0)
start_index = indices[0]
end_index = indices[-1]
for i, frame in enumerate(container.decode(video=0)):
if i > end_index:
break
if i >= start_index and i in indices:
frames.append(frame)
return np.stack([x.to_ndarray(format="rgb24") for x in frames])
@spaces.GPU
def process_video(video_file, question):
'''
Processes a single video and returns the answer to the given question.
'''
with av.open(video_file.name) as container:
total_frames = container.streams.video[0].frames
indices = np.arange(0, total_frames, total_frames / 8).astype(int)
video_clip = read_video_pyav(container, indices)
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": f"{question}"},
{"type": "video"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
input = processor([prompt], videos=[video_clip], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}
# Disable gradient calculation during inference
with torch.no_grad():
output = model.generate(**input, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)[0]
return generated_text.split("ASSISTANT: ", 1)[-1].strip()
@spaces.GPU
def analyze_videos(video_files, selected_questions):
"""Analyzes videos, saves results to CSV, and returns CSV data and JSON."""
all_results = {}
questions = {
"hands_free": "Is the subject's hand in the video free or not?",
"standing": "Is the subject in the video sitting or standing?",
"interaction_with_background": "Assess the surroundings behind the subject in the video. Do they seem to interact with any visible screens, such as laptops, TVs, or digital billboards? If yes, then they are interacting with a screen. If not, they are not interacting with a screen.",
"indoors": "Consider the broader environmental context shown in the video’s background. Are there signs of an open-air space, like greenery, structures, or people passing by? If so, it’s an outdoor setting. If the setting looks confined with furniture, walls, or home decorations, it’s an indoor environment."
}
for video_file in video_files:
video_name = os.path.basename(video_file.name)
all_results[video_name] = {}
for question_key in selected_questions:
answer = process_video(video_file, questions[question_key])
all_results[video_name][question_key] = "true" if "yes" in answer.lower() else "false"
# Clear cache and collect garbage after each video
gc.collect()
torch.cuda.empty_cache()
# Create CSV content
csv_output = io.StringIO()
writer = csv.writer(csv_output)
header = ["Video File"] + list(questions.keys())
writer.writerow(header)
for video_name, results in all_results.items():
row = [video_name] + [results.get(key, "") for key in questions]
writer.writerow(row)
csv_content = csv_output.getvalue()
# Return both JSON and CSV
json_output = json.dumps(all_results, indent=4)
return json_output, csv_content
def download_csv(csv_content):
"""Creates a downloadable CSV file."""
return gr.File.update(
value=csv_content,
filename="video_analysis.csv",
)
# Define Gradio interface
with gr.Blocks() as iface:
with gr.Row():
file_input = gr.File(label="Upload Videos", file_count="multiple")
question_input = gr.CheckboxGroup(["hands_free", "standing", "interaction_with_background", "indoors"],
label="Select Questions to Apply")
process_button = gr.Button("Process Videos")
with gr.Row():
json_output = gr.JSON(label="Analysis Results (JSON)")
csv_output = gr.Textbox(label="CSV Results", lines=15)
download_button = gr.Button("Download CSV")
# Link buttons to their respective functions
process_button.click(analyze_videos, inputs=[file_input, question_input], outputs=[json_output, csv_output])
download_button.click(download_csv, inputs=csv_output, outputs=download_button)
if __name__ == "__main__":
iface.launch(debug=True) |