|
|
import os |
|
|
import time |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import List, Optional, Tuple, Union |
|
|
|
|
|
from tqdm import tqdm |
|
|
from transformers import AutoProcessor |
|
|
|
|
|
from lmms_eval.api.instance import Instance |
|
|
from lmms_eval.api.registry import register_model |
|
|
from lmms_eval.models.chat.vllm import VLLM as VLLMChat |
|
|
from lmms_eval.models.model_utils.gen_metrics import log_metrics |
|
|
from lmms_eval.protocol import ChatMessages |
|
|
|
|
|
try: |
|
|
from vllm import LLM, SamplingParams |
|
|
except ImportError: |
|
|
vllm = None |
|
|
|
|
|
from qwen_vl_utils import fetch_video, process_vision_info |
|
|
|
|
|
WORKERS = int(os.getenv("WORKERS", "32")) |
|
|
|
|
|
|
|
|
@register_model("vllm_generate") |
|
|
class VLLMGenerate(VLLMChat): |
|
|
""" |
|
|
Different from .chat, use generate method instead of chat method. |
|
|
The input is a list of vllm inputs, and the output is a list of responses. |
|
|
The vllm inputs are a list of dictionaries, each dictionary contains the following keys: |
|
|
- prompt: the prompt to the model |
|
|
- multi_modal_data: the multi-modal data to the model |
|
|
- mm_processor_kwargs: the multi-modal processor kwargs to the model |
|
|
The vllm inputs are built from the Instance. |
|
|
The responses are a list of strings. |
|
|
|
|
|
So that we allow the processor to process correct video especially for Qwen3-VL series |
|
|
""" |
|
|
|
|
|
is_simple = False |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model="Qwen/Qwen2.5-VL-3B-Instruct", |
|
|
tensor_parallel_size=1, |
|
|
data_parallel_size=1, |
|
|
gpu_memory_utilization=0.8, |
|
|
batch_size=1, |
|
|
max_frame_num=768, |
|
|
trust_remote_code=True, |
|
|
chat_template=None, |
|
|
max_pixels: int = 1605632, |
|
|
min_image_pixels=28, |
|
|
fps: Optional[int] = None, |
|
|
nframes: Optional[int] = 32, |
|
|
**kwargs, |
|
|
): |
|
|
super().__init__(model, tensor_parallel_size, data_parallel_size, gpu_memory_utilization, batch_size, max_frame_num, trust_remote_code, chat_template, max_pixels, min_image_pixels, fps, nframes, **kwargs) |
|
|
self.processor = AutoProcessor.from_pretrained(model) |
|
|
if self.chat_template is not None: |
|
|
with open(self.chat_template, "r") as f: |
|
|
chat_template = f.read() |
|
|
self.processor.chat_template = chat_template |
|
|
|
|
|
def make_one_request(self, request: Instance) -> Tuple[list[dict], dict]: |
|
|
""" |
|
|
Build OpenAI-style messages and per-request sampling params from an Instance. |
|
|
Returns (messages, params_dict). Does not mutate input. |
|
|
""" |
|
|
ctx, doc_to_messages, gen_kwargs, doc_id, task, split = request.arguments |
|
|
raw_messages = doc_to_messages(self.task_dict[task][split][doc_id]) |
|
|
chat_messages = ChatMessages(messages=raw_messages) |
|
|
|
|
|
_gen = dict(gen_kwargs or {}) |
|
|
_gen.setdefault("max_new_tokens", 4096) |
|
|
_gen.setdefault("temperature", 0) |
|
|
_gen.setdefault("top_p", 0.95) |
|
|
|
|
|
params = { |
|
|
"temperature": _gen["temperature"], |
|
|
"max_tokens": _gen["max_new_tokens"], |
|
|
"top_p": _gen["top_p"], |
|
|
} |
|
|
|
|
|
video_kwargs = { |
|
|
"max_pixels": self.max_pixels, |
|
|
"min_pixels": self.min_image_pixels, |
|
|
"max_frames": self.max_frame_num, |
|
|
} |
|
|
if self.fps is not None: |
|
|
video_kwargs["fps"] = self.fps |
|
|
else: |
|
|
video_kwargs["nframes"] = self.nframes |
|
|
messages = chat_messages.to_hf_messages(video_kwargs=video_kwargs) |
|
|
images, videos, audios = chat_messages.extract_media() |
|
|
video_inputs = [] |
|
|
video_metadatas = [] |
|
|
kwargs = {} |
|
|
for video in videos: |
|
|
video_dict = { |
|
|
"type": "video", |
|
|
"video": video, |
|
|
**video_kwargs, |
|
|
} |
|
|
final_video, fps = fetch_video(video_dict, return_video_metadata=True, return_video_sample_fps=True) |
|
|
frames, video_metadata = final_video |
|
|
video_inputs.append(frames) |
|
|
video_metadatas.append(video_metadata) |
|
|
kwargs["fps"] = fps |
|
|
kwargs["do_sample_frames"] = False |
|
|
if len(videos) == 0: |
|
|
video_inputs = None |
|
|
video_metadatas = None |
|
|
if len(images) == 0: |
|
|
images = None |
|
|
if len(audios) == 0: |
|
|
audios = None |
|
|
|
|
|
text = self.processor.apply_chat_template( |
|
|
messages, |
|
|
tokenize=False, |
|
|
add_generation_prompt=True, |
|
|
) |
|
|
|
|
|
vllm_inputs = {"prompt": text, "multi_modal_data": {}} |
|
|
if images is not None: |
|
|
vllm_inputs["multi_modal_data"]["image"] = images |
|
|
if video_inputs is not None: |
|
|
vllm_inputs["multi_modal_data"]["video"] = [] |
|
|
for video_input, video_metadata in zip(video_inputs, video_metadatas): |
|
|
if "Qwen3VL" in type(self.processor).__name__: |
|
|
video_input = (video_input, video_metadata) |
|
|
else: |
|
|
video_input = video_input |
|
|
vllm_inputs["multi_modal_data"]["video"].append(video_input) |
|
|
vllm_inputs["mm_processor_kwargs"] = { |
|
|
**kwargs, |
|
|
} |
|
|
|
|
|
return vllm_inputs, params |
|
|
|
|
|
def generate_until(self, requests) -> List[str]: |
|
|
res = [] |
|
|
self.load_cache() |
|
|
res, requests = self.get_response_from_cache(requests) |
|
|
pbar = tqdm(total=len(requests), disable=(self.rank != 0), desc="Model Responding") |
|
|
|
|
|
batch_size = self.batch_size_per_gpu |
|
|
batched_requests = [requests[i : i + batch_size] for i in range(0, len(requests), batch_size)] |
|
|
e2e_latency = 0 |
|
|
for batch_requests in batched_requests: |
|
|
batched_vllm_inputs = [] |
|
|
with ThreadPoolExecutor(max_workers=WORKERS) as executor: |
|
|
futures = [executor.submit(self.make_one_request, request) for request in batch_requests] |
|
|
for future in futures: |
|
|
vllm_inputs, sampling_params = future.result() |
|
|
batched_vllm_inputs.append(vllm_inputs) |
|
|
|
|
|
sampling_params = SamplingParams(**sampling_params) |
|
|
start_time = time.time() |
|
|
response = self.client.generate(batched_vllm_inputs, sampling_params) |
|
|
end_time = time.time() |
|
|
|
|
|
response_text = [o.outputs[0].text for o in response] |
|
|
for req, text in zip(batch_requests, response_text): |
|
|
self.add_request_response_to_cache(req, text) |
|
|
|
|
|
|
|
|
e2e_latency += end_time - start_time |
|
|
|
|
|
assert len(response_text) == len(batch_requests) |
|
|
res.extend(response_text) |
|
|
pbar.update(len(batch_requests)) |
|
|
|
|
|
if not self.disable_log_stats: |
|
|
metrics = self.get_format_metrics() |
|
|
total_tokens = metrics["generation_tokens"] |
|
|
avg_speed = total_tokens / e2e_latency if e2e_latency > 0 else 0 |
|
|
metric_dict = { |
|
|
"total_tokens": total_tokens, |
|
|
"e2e_latency": e2e_latency, |
|
|
"avg_speed": avg_speed, |
|
|
"additional_metrics": { |
|
|
"ttft": metrics["ttft"], |
|
|
"tpot": metrics["tpot"], |
|
|
"rank": self.rank, |
|
|
}, |
|
|
} |
|
|
log_metrics(**metric_dict) |
|
|
|
|
|
pbar.close() |
|
|
return res |
|
|
|
|
|
def loglikelihood(self, requests: List[Instance]) -> List[Tuple[float, bool]]: |
|
|
|
|
|
assert False, "GPT4V not support" |
|
|
|
|
|
def generate_until_multi_round(self, requests) -> List[str]: |
|
|
raise NotImplementedError("TODO: Implement multi-round generation") |
|
|
|
|
|
def get_format_metrics(self): |
|
|
metrics = self.client.get_metrics() |
|
|
ttft = 0 |
|
|
tpot = 0 |
|
|
generation_tokens = 0 |
|
|
for metric in metrics: |
|
|
name = metric.name |
|
|
if "time_to_first_token" in name: |
|
|
ttft = metric.sum / metric.count |
|
|
if "time_per_output_token_seconds" in name: |
|
|
tpot = metric.sum / metric.count |
|
|
if name == "vllm:generation_tokens": |
|
|
generation_tokens = metric.value |
|
|
|
|
|
metrics = { |
|
|
"ttft": ttft, |
|
|
"tpot": tpot, |
|
|
"generation_tokens": generation_tokens, |
|
|
} |
|
|
|
|
|
return metrics |
|
|
|