|
|
import spaces |
|
|
import subprocess |
|
|
|
|
|
subprocess.run( |
|
|
"pip install flash-attn --no-build-isolation", |
|
|
env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, |
|
|
shell=True, |
|
|
) |
|
|
import time |
|
|
import logging |
|
|
import gradio as gr |
|
|
import cv2 |
|
|
import os |
|
|
from transformers import AutoProcessor, AutoModelForImageTextToText |
|
|
import torch |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
default_cache = {'model_id': None, 'processor': None, 'model': None, 'device': None} |
|
|
model_cache = default_cache.copy() |
|
|
|
|
|
|
|
|
has_xpu = hasattr(torch, 'xpu') and torch.xpu.is_available() |
|
|
|
|
|
def update_model(model_id, device): |
|
|
if model_cache['model_id'] != model_id or model_cache['device'] != device: |
|
|
logging.info(f'Loading model {model_id} on {device}') |
|
|
try: |
|
|
processor = AutoProcessor.from_pretrained(model_id) |
|
|
|
|
|
if device == 'cuda': |
|
|
|
|
|
model = AutoModelForImageTextToText.from_pretrained( |
|
|
model_id, |
|
|
torch_dtype=torch.bfloat16, |
|
|
_attn_implementation='flash_attention_2' |
|
|
).to('cuda') |
|
|
elif device == 'xpu' and has_xpu: |
|
|
|
|
|
model = AutoModelForImageTextToText.from_pretrained( |
|
|
model_id, |
|
|
torch_dtype=torch.float32 |
|
|
).to('xpu') |
|
|
else: |
|
|
|
|
|
model = AutoModelForImageTextToText.from_pretrained(model_id).to('cpu') |
|
|
model.eval() |
|
|
model_cache.update({'model_id': model_id, 'processor': processor, 'model': model, 'device': device}) |
|
|
except Exception as e: |
|
|
logging.error(f'Error loading model: {e}') |
|
|
raise e |
|
|
|
|
|
def extract_frames_from_video(video_path, max_frames=10): |
|
|
"""Extract frames from video file for processing""" |
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
|
|
|
|
|
|
|
if not video_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')): |
|
|
raise ValueError("Unsupported video format. Please use MP4, AVI, MOV, MKV, or WEBM.") |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video file: {video_path}") |
|
|
|
|
|
frames = [] |
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
if frame_count == 0: |
|
|
cap.release() |
|
|
raise ValueError("Video file appears to be empty or corrupted") |
|
|
|
|
|
|
|
|
step = max(1, frame_count // max_frames) |
|
|
|
|
|
frame_idx = 0 |
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_idx % step == 0: |
|
|
frames.append(frame) |
|
|
if len(frames) >= max_frames: |
|
|
break |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
return frames |
|
|
|
|
|
@spaces.GPU |
|
|
def caption_frame(frame, model_id, interval_ms, sys_prompt, usr_prompt, device): |
|
|
"""Caption a single frame (used for webcam streaming)""" |
|
|
debug_msgs = [] |
|
|
try: |
|
|
update_model(model_id, device) |
|
|
processor = model_cache['processor'] |
|
|
model = model_cache['model'] |
|
|
|
|
|
|
|
|
time.sleep(interval_ms / 1000) |
|
|
|
|
|
|
|
|
t0 = time.time() |
|
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
pil_img = Image.fromarray(rgb) |
|
|
temp_path = 'frame.jpg' |
|
|
pil_img.save(temp_path, format='JPEG', quality=50) |
|
|
debug_msgs.append(f'Preprocess: {int((time.time()-t0)*1000)} ms') |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{'role': 'system', 'content': [{'type': 'text', 'text': sys_prompt}]}, |
|
|
{'role': 'user', 'content': [ |
|
|
{'type': 'image', 'url': temp_path}, |
|
|
{'type': 'text', 'text': usr_prompt} |
|
|
]} |
|
|
] |
|
|
|
|
|
|
|
|
t1 = time.time() |
|
|
inputs = processor.apply_chat_template( |
|
|
messages, |
|
|
add_generation_prompt=True, |
|
|
tokenize=True, |
|
|
return_dict=True, |
|
|
return_tensors='pt' |
|
|
) |
|
|
|
|
|
param_dtype = next(model.parameters()).dtype |
|
|
cast_inputs = {} |
|
|
for k, v in inputs.items(): |
|
|
if isinstance(v, torch.Tensor): |
|
|
if v.dtype.is_floating_point: |
|
|
|
|
|
cast_inputs[k] = v.to(device=model.device, dtype=param_dtype) |
|
|
else: |
|
|
|
|
|
cast_inputs[k] = v.to(device=model.device) |
|
|
else: |
|
|
cast_inputs[k] = v |
|
|
inputs = cast_inputs |
|
|
debug_msgs.append(f'Tokenize: {int((time.time()-t1)*1000)} ms') |
|
|
|
|
|
|
|
|
t2 = time.time() |
|
|
outputs = model.generate(**inputs, do_sample=False, max_new_tokens=128) |
|
|
debug_msgs.append(f'Inference: {int((time.time()-t2)*1000)} ms') |
|
|
|
|
|
|
|
|
t3 = time.time() |
|
|
raw = processor.batch_decode(outputs, skip_special_tokens=True)[0] |
|
|
debug_msgs.append(f'Decode: {int((time.time()-t3)*1000)} ms') |
|
|
if "Assistant:" in raw: |
|
|
caption = raw.split("Assistant:")[-1].strip() |
|
|
else: |
|
|
lines = raw.splitlines() |
|
|
caption = lines[-1].strip() if len(lines) > 1 else raw.strip() |
|
|
|
|
|
|
|
|
if os.path.exists(temp_path): |
|
|
os.remove(temp_path) |
|
|
|
|
|
return caption, '\n'.join(debug_msgs) |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}", '\n'.join(debug_msgs) |
|
|
|
|
|
@spaces.GPU |
|
|
def process_video_file(video_file, model_id, sys_prompt, usr_prompt, device, max_frames): |
|
|
"""Process uploaded video file and return captions for multiple frames""" |
|
|
if video_file is None: |
|
|
return "No video file uploaded", "" |
|
|
|
|
|
debug_msgs = [] |
|
|
temp_files = [] |
|
|
|
|
|
try: |
|
|
update_model(model_id, device) |
|
|
processor = model_cache['processor'] |
|
|
model = model_cache['model'] |
|
|
|
|
|
|
|
|
t0 = time.time() |
|
|
frames = extract_frames_from_video(video_file, max_frames) |
|
|
debug_msgs.append(f'Extracted {len(frames)} frames in {int((time.time()-t0)*1000)} ms') |
|
|
|
|
|
if not frames: |
|
|
return "No frames could be extracted from the video", '\n'.join(debug_msgs) |
|
|
|
|
|
captions = [] |
|
|
|
|
|
for i, frame in enumerate(frames): |
|
|
|
|
|
t1 = time.time() |
|
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
pil_img = Image.fromarray(rgb) |
|
|
temp_path = f'frame_{i}.jpg' |
|
|
temp_files.append(temp_path) |
|
|
pil_img.save(temp_path, format='JPEG', quality=50) |
|
|
|
|
|
|
|
|
messages = [ |
|
|
{'role': 'system', 'content': [{'type': 'text', 'text': sys_prompt}]}, |
|
|
{'role': 'user', 'content': [ |
|
|
{'type': 'image', 'url': temp_path}, |
|
|
{'type': 'text', 'text': usr_prompt} |
|
|
]} |
|
|
] |
|
|
|
|
|
|
|
|
inputs = processor.apply_chat_template( |
|
|
messages, |
|
|
add_generation_prompt=True, |
|
|
tokenize=True, |
|
|
return_dict=True, |
|
|
return_tensors='pt' |
|
|
) |
|
|
|
|
|
|
|
|
param_dtype = next(model.parameters()).dtype |
|
|
cast_inputs = {} |
|
|
for k, v in inputs.items(): |
|
|
if isinstance(v, torch.Tensor): |
|
|
if v.dtype.is_floating_point: |
|
|
cast_inputs[k] = v.to(device=model.device, dtype=param_dtype) |
|
|
else: |
|
|
cast_inputs[k] = v.to(device=model.device) |
|
|
else: |
|
|
cast_inputs[k] = v |
|
|
inputs = cast_inputs |
|
|
|
|
|
|
|
|
outputs = model.generate(**inputs, do_sample=False, max_new_tokens=128) |
|
|
|
|
|
|
|
|
raw = processor.batch_decode(outputs, skip_special_tokens=True)[0] |
|
|
if "Assistant:" in raw: |
|
|
caption = raw.split("Assistant:")[-1].strip() |
|
|
else: |
|
|
lines = raw.splitlines() |
|
|
caption = lines[-1].strip() if len(lines) > 1 else raw.strip() |
|
|
|
|
|
captions.append(f"Frame {i+1}: {caption}") |
|
|
debug_msgs.append(f'Frame {i+1} processed in {int((time.time()-t1)*1000)} ms') |
|
|
|
|
|
return '\n\n'.join(captions), '\n'.join(debug_msgs) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error processing video: {str(e)}", '\n'.join(debug_msgs) |
|
|
finally: |
|
|
|
|
|
for temp_file in temp_files: |
|
|
if os.path.exists(temp_file): |
|
|
try: |
|
|
os.remove(temp_file) |
|
|
except Exception as cleanup_error: |
|
|
logging.warning(f"Failed to cleanup {temp_file}: {cleanup_error}") |
|
|
|
|
|
def toggle_input_mode(input_mode): |
|
|
"""Toggle between webcam and video file input""" |
|
|
if input_mode == "Webcam": |
|
|
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) |
|
|
else: |
|
|
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) |
|
|
|
|
|
def main(): |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
model_choices = [ |
|
|
'HuggingFaceTB/SmolVLM2-256M-Video-Instruct', |
|
|
'HuggingFaceTB/SmolVLM2-500M-Video-Instruct', |
|
|
'HuggingFaceTB/SmolVLM2-2.2B-Instruct' |
|
|
] |
|
|
|
|
|
|
|
|
device_options = ['cpu'] |
|
|
if torch.cuda.is_available(): |
|
|
device_options.append('cuda') |
|
|
if has_xpu: |
|
|
device_options.append('xpu') |
|
|
|
|
|
default_device = 'cuda' if torch.cuda.is_available() else ('xpu' if has_xpu else 'cpu') |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown('## 🎥 Real-Time Webcam & Video File Captioning with SmolVLM2 (Transformers)') |
|
|
|
|
|
with gr.Row(): |
|
|
input_mode = gr.Radio( |
|
|
choices=["Webcam", "Video File"], |
|
|
value="Webcam", |
|
|
label="Input Mode" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
model_dd = gr.Dropdown(model_choices, value=model_choices[0], label='Model ID') |
|
|
device_dd = gr.Dropdown(device_options, value=default_device, label='Device') |
|
|
|
|
|
|
|
|
with gr.Row() as webcam_controls: |
|
|
interval = gr.Slider(100, 20000, step=100, value=3000, label='Interval (ms)') |
|
|
|
|
|
|
|
|
with gr.Row(visible=False) as video_controls: |
|
|
max_frames = gr.Slider(1, 20, step=1, value=5, label='Max Frames to Process') |
|
|
|
|
|
sys_p = gr.Textbox(lines=2, value='Describe the key action', label='System Prompt') |
|
|
usr_p = gr.Textbox(lines=1, value='What is happening in this image?', label='User Prompt') |
|
|
|
|
|
|
|
|
cam = gr.Image(sources=['webcam'], streaming=True, label='Webcam Feed') |
|
|
video_file = gr.File( |
|
|
label="Upload Video File", |
|
|
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
process_btn = gr.Button("Process Video", visible=False) |
|
|
|
|
|
|
|
|
caption_tb = gr.Textbox(interactive=False, label='Caption') |
|
|
log_tb = gr.Textbox(lines=4, interactive=False, label='Debug Log') |
|
|
|
|
|
|
|
|
input_mode.change( |
|
|
fn=toggle_input_mode, |
|
|
inputs=[input_mode], |
|
|
outputs=[cam, video_file, process_btn] |
|
|
) |
|
|
|
|
|
|
|
|
input_mode.change( |
|
|
fn=lambda mode: (gr.update(visible=mode=="Webcam"), gr.update(visible=mode=="Video File")), |
|
|
inputs=[input_mode], |
|
|
outputs=[webcam_controls, video_controls] |
|
|
) |
|
|
|
|
|
|
|
|
cam.stream( |
|
|
fn=caption_frame, |
|
|
inputs=[cam, model_dd, interval, sys_p, usr_p, device_dd], |
|
|
outputs=[caption_tb, log_tb], |
|
|
time_limit=600 |
|
|
) |
|
|
|
|
|
|
|
|
process_btn.click( |
|
|
fn=process_video_file, |
|
|
inputs=[video_file, model_dd, sys_p, usr_p, device_dd, max_frames], |
|
|
outputs=[caption_tb, log_tb] |
|
|
) |
|
|
|
|
|
|
|
|
demo.queue() |
|
|
|
|
|
|
|
|
demo.launch() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |