Qwen2-VL-7B-Instruct / handler.py
hperkins's picture
Update handler.py
acc9b5d verified
raw
history blame
5.11 kB
from typing import Dict, List, Any
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from modelscope import snapshot_download
from qwen_vl_utils import process_vision_info
import torch
import os
import base64
import io
from PIL import Image
import ffmpeg
import logging
import requests
class EndpointHandler():
def __init__(self, path=""):
self.model_dir = path
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
self.model_dir, torch_dtype="auto", device_map="auto"
)
self.processor = AutoProcessor.from_pretrained(self.model_dir)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
data args:
inputs (str): The input text, including any image or video references.
max_new_tokens (int, optional): Maximum number of tokens to generate. Defaults to 128.
Return:
A dictionary containing the generated text.
"""
inputs = data.get("inputs")
max_new_tokens = data.get("max_new_tokens", 128)
# Construct the messages list from the input string
messages = [{"role": "user", "content": self._parse_input(inputs)}]
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu")
generated_ids = self.model.generate(**inputs, max_new_tokens=max_new_tokens)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = self.processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0] # Return a single string
return {"generated_text": output_text}
def _parse_input(self, input_string):
"""Parses the input string to identify image/video references and text."""
content = []
parts = input_string.split("<image>")
for i, part in enumerate(parts):
if i % 2 == 0: # Text part
content.append({"type": "text", "text": part.strip()})
else: # Image/video part
if part.startswith("video:"):
video_path = part.split("video:")[1].strip()
video_frames = self._extract_video_frames(video_path)
if video_frames:
content.append({"type": "video", "video": video_frames, "fps": 1}) # Add fps
else:
image = self._load_image(part.strip())
if image:
content.append({"type": "image", "image": image})
return content
def _load_image(self, image_data):
"""Loads an image from a URL or base64 encoded string."""
if image_data.startswith("http"):
try:
image = Image.open(requests.get(image_data, stream=True).raw)
except Exception as e:
logging.error(f"Error loading image from URL: {e}")
return None
elif image_data.startswith("data:image"):
try:
image_data = image_data.split(",")[1]
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
except Exception as e:
logging.error(f"Error loading image from base64: {e}")
return None
else:
logging.error("Invalid image data format. Must be URL or base64 encoded.")
return None
return image
def _extract_video_frames(self, video_path, fps=1):
"""Extracts frames from a video at the specified FPS."""
try:
probe = ffmpeg.probe(video_path)
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
if not video_stream:
logging.error(f"No video stream found in {video_path}")
return None
width = int(video_stream['width'])
height = int(video_stream['height'])
out, _ = (
ffmpeg
.input(video_path)
.filter('fps', fps=fps)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run(capture_stdout=True)
)
frames = []
for i in range(0, len(out), width * height * 3):
frame_data = out[i:i + width * height * 3]
frame = Image.frombytes('RGB', (width, height), frame_data)
frames.append(frame)
return frames
except ffmpeg.Error as e:
logging.error(f"Error extracting video frames: {e}")
return None