Qwen2-VL-7B-Instruct / handler.py
hperkins's picture
Update handler.py
078e469 verified
raw
history blame
7.2 kB
from typing import Dict, Any
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from modelscope import snapshot_download
from qwen_vl_utils import process_vision_info
import torch
import os
import base64
import io
from PIL import Image
import logging
import requests
import subprocess
from moviepy.editor import VideoFileClip
import traceback # For formatting exception tracebacks
class EndpointHandler():
"""
Handler class for the Qwen2-VL-7B-Instruct model on Hugging Face Inference Endpoints.
This handler processes text, image, and video inputs, leveraging the Qwen2-VL model
for multimodal understanding and generation. It includes a runtime workaround to
install FFmpeg if it's not available in the environment.
"""
def __init__(self, path=""):
"""
Initializes the handler, installs FFmpeg, and loads the Qwen2-VL model.
Args:
path (str, optional): The path to the Qwen2-VL model directory. Defaults to "".
"""
self.model_dir = path
# Install FFmpeg at runtime (this will run once during container initialization)
try:
subprocess.run(["apt-get", "update"], check=True)
subprocess.run(["apt-get", "install", "-y", "ffmpeg"], check=True)
logging.info("FFmpeg installed successfully.")
except subprocess.CalledProcessError as e:
logging.error(f"Error installing FFmpeg: {e}")
# Load the Qwen2-VL model
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
self.model_dir, torch_dtype="auto", device_map="auto"
)
self.processor = AutoProcessor.from_pretrained(self.model_dir)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Processes the input data and returns the Qwen2-VL model's output.
Args:
data (Dict[str, Any]): A dictionary containing the input data.
- "inputs" (str): The input text, including image/video references.
- "max_new_tokens" (int, optional): Max tokens to generate (default: 128).
Returns:
Dict[str, Any]: A dictionary containing the generated text.
"""
inputs = data.get("inputs")
max_new_tokens = data.get("max_new_tokens", 128)
# Construct the messages list from the input string
messages = [{"role": "user", "content": self._parse_input(inputs)}]
# Prepare for inference (using qwen_vl_utils)
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
logging.debug(f"Image inputs: {image_inputs}")
logging.debug(f"Video inputs: {video_inputs}")
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu")
# Inference
generated_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = self.processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return {"generated_text": output_text}
def _parse_input(self, input_string):
"""
Parses the input string to identify image/video references and text.
Args:
input_string (str): The input string containing text, image, and video references.
Returns:
list: A list of dictionaries representing the parsed content.
"""
content = []
parts = input_string.split("<image>")
for i, part in enumerate(parts):
if i % 2 == 0: # Text part
content.append({"type": "text", "text": part.strip()})
else: # Image/video part
if part.lower().startswith("video:"):
video_path = part.split("video:")[1].strip()
print(f"Video path: {video_path}")
video_frames = self._extract_video_frames(video_path)
print(f"Number of frames extracted: {len(video_frames) if video_frames else 0}")
if video_frames:
content.append({"type": "video", "video": video_frames, "fps": 1})
else:
image = self._load_image(part.strip())
if image:
content.append({"type": "image", "image": image})
return content
def _load_image(self, image_data):
"""
Loads an image from a URL or base64 encoded string.
Args:
image_data (str): The image data, either a URL or a base64 encoded string.
Returns:
PIL.Image.Image or None: The loaded image, or None if loading fails.
"""
if image_data.startswith("http"):
try:
image = Image.open(requests.get(image_data, stream=True).raw)
except Exception as e:
logging.error(f"Error loading image from URL: {e}")
return None
elif image_data.startswith("data:image"):
try:
image_data = image_data.split(",")[1]
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
except Exception as e:
logging.error(f"Error loading image from base64: {e}")
return None
else:
logging.error("Invalid image data format. Must be URL or base64 encoded.")
return None
return image
def _extract_video_frames(self, video_path, fps=1):
"""
Extracts frames from a video at the specified FPS using MoviePy.
Args:
video_path (str): The path or URL of the video file.
fps (int, optional): The desired frames per second. Defaults to 1.
Returns:
list or None: A list of PIL Images representing the extracted frames,
or None if extraction fails.
"""
try:
print(f"Attempting to load video from: {video_path}")
video = VideoFileClip(video_path)
print(f"Video loaded: {video}")
frames = [
Image.fromarray(frame.astype('uint8'), 'RGB')
for frame in video.iter_frames(fps=fps)
]
print(f"Number of frames: {len(frames)}")
print(f"Frame type: {type(frames[0]) if frames else None}")
print(f"Frame size: {frames[0].size if frames else None}")
video.close()
return frames
except Exception as e:
error_message = f"Error extracting video frames: {e}\n{traceback.format_exc()}"
logging.error(error_message) # Log the formatted error message
return None