synthesis / ObjectRecognition /text_object_vllmdataparallel.py
CharmingDog's picture
Upload folder using huggingface_hub
55500d6 verified
# SPDX-License-Identifier: Apache-2.0
# usage:
# VLLM_USE_V1=1 python examples/offline_inference/data_parallel.py
# we need to have a launcher to create multiple data parallel
# ranks. And each rank will create a vLLM instance to process its own prompts.
import os
from vllm import LLM, SamplingParams
from vllm.utils import get_open_port
import random
random.seed(42)
from prompt import object_recognition_prompt_miradata, prompt_miradata_based_text
prompt_generate = [prompt_miradata_based_text]
from transformers import AutoTokenizer
import jsonlines
import json
from multiprocessing import Process
import time
import argparse
import gc
import torch
import psutil
import pdb
from tqdm import tqdm
def get_agrs():
parser = argparse.ArgumentParser()
parser.add_argument("--save_dir", type=str, default="/share/minghao/VideoProjects/Sythesis/LongVideoCaption/CaptionResults")
parser.add_argument("--model", type=str, default="Qwen2.5-VL-72B-Instruct-AWQ")
parser.add_argument("--GPUs_per_dp_rank", type=int, default=2)
parser.add_argument("--DP_size", type=int, default=4)
parser.add_argument("--start", type=int, default=0)
parser.add_argument("--end", type=int, default=10000)
parser.add_argument("--max_num_seqs", type=int, default=4)
parser.add_argument("--max_model_len", type=int, default=32768)
parser.add_argument("--max_tokens", type=int, default=8192)
args = parser.parse_args()
return args
def get_have_processed(save_dir):
names = os.listdir(save_dir)
paths = [ os.path.join(save_dir, tmp) for tmp in names ]
record_video_id = []
for path in paths:
datas = load_jsonl(path)
for data in datas:
video_id = datas['clip_id']
if video_id in record_video_id:
continue
else:
record_video_id.append(video_id)
return record_video_id
def load_jsonl(path):
datas = []
# 读取 JSONL 文件
with jsonlines.open(path, "r") as reader:
for obj in reader:
datas.append(obj)
return datas
def load_json(path):
with open(path, "r") as reader:
datas = json.load(reader)
return datas
def main(dp_size, dp_rank, dp_master_ip, dp_master_port, GPUs_per_dp_rank, data_inps):
os.environ["VLLM_DP_RANK"] = str(dp_rank)
os.environ["VLLM_DP_SIZE"] = str(dp_size)
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)
# set devices for each dp_rank
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
str(i) for i in range(dp_rank * GPUs_per_dp_rank, (dp_rank + 1) *
GPUs_per_dp_rank))
# Sample prompts.
# prompts = [
# "Hello, my name is",
# "The president of the United States is",
# "The capital of France is",
# "The future of AI is",
# ]
# with DP, each rank should process different prompts.
# usually all the DP ranks process a full dataset,
# and each rank processes a different part of the dataset.
promts_per_rank = len(data_inps) // dp_size
start = dp_rank * promts_per_rank
end = start + promts_per_rank
this_data_inps = data_inps[start:end]
if len(this_data_inps) == 0:
# if any rank has no prompts to process,
# we need to set a placeholder prompt
this_data_inps = ["Placeholder"]
print(f"DP rank {dp_rank} needs to process {len(this_data_inps)} prompts")
# Create a sampling params object.
# since we are doing data parallel, every rank can have different
# sampling params. here we set different max_tokens for different
# ranks for demonstration.
max_tokens = args.max_tokens
sampling_params = SamplingParams(temperature=0.1, top_k=20, top_p=0.8, repetition_penalty=1.05, max_tokens=max_tokens)
model_name = f"/share/minghao/Models/{args.model}"
max_model_len = args.max_model_len
max_num_seqs = args.max_num_seqs
# Create an LLM.
llm = LLM(model=model_name,
tensor_parallel_size=GPUs_per_dp_rank,max_model_len=max_model_len,enforce_eager=True,gpu_memory_utilization=0.9, max_num_seqs=max_num_seqs) #
batch_size = 2000
save_dir = args.save_dir
os.makedirs(save_dir, exist_ok=True)
save_name = f'{dp_rank}.jsonl'
save_path = os.path.join(save_dir, save_name)
with open(save_path, 'a') as file:
for i in range(0, len(this_data_inps), batch_size):
start = time.time()
batch_this_data_inps = this_data_inps[i:i+batch_size]
batch_prompts = [tmp['qa_prompt'] for tmp in batch_this_data_inps]
outputs = llm.generate(batch_prompts, sampling_params)
print(f'推理完成 Total Finish:{len(outputs)}')
for idx, output in enumerate(outputs):
this_inp = batch_this_data_inps[idx]
prompt = output.prompt
generated_qa = output.outputs[0].text
this_inp['qa_prompt'] = prompt
this_inp.update({"generated_qa": generated_qa})
file.write(json.dumps(this_inp) + "\n")
file.flush() # 加上 flush 进一步保险
end = time.time()
del batch_this_data_inps, batch_prompts, outputs
gc.collect()
print(f'batch time cost: {end-start}s')
print(f"[Memory] CPU: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2:.2f} MB")
print(f"[Memory] GPU: {torch.cuda.memory_allocated() / 1024 ** 2:.2f} MB")
def read_all_captions(root_caption_dir, caption_file_names):
caption_file_dir_list = [os.path.join(root_caption_dir, file_name) for file_name in caption_file_names]
datas = []
for caption_dir in caption_file_dir_list:
caption_file_names = sorted(os.listdir(caption_dir))
caption_file_paths = [os.path.join(caption_dir, name) for name in caption_file_names]
for path in caption_file_paths:
datas += load_jsonl(path)
return datas
if __name__ == "__main__":
args = get_agrs()
datas = load_json('/share/minghao/VideoProjects/Sythesis2/Candidates/miradata_youtube_31k_5_10min_filter_clips.json')
print(f'Total Video Size: {len(datas)}')
# 整理成以clip为单位,并且适合合成
new_datas = []
for data in tqdm(datas):
clips = data['clips']
for clip in clips:
clip['clip_id'] = str(clip['clip_id']) + '_' + clip['video_id']
new_datas.extend(clips)
print(f'Total Clips Size: {len(new_datas)}')
datas = new_datas
start = args.start
end = args.end
datas = datas[start:end]
print(f'Start: {start}, End: {end}')
print(f'to process size: {len(datas)}')
save_dir = args.save_dir
if os.path.exists(save_dir):
have_downloaded = get_have_processed(save_dir)
filter_datas = []
for data in tqdm(datas, desc='Filtering 2...'):
if data['clip_id'] in have_downloaded:
continue
else:
filter_datas.append(data)
datas = filter_datas
print(f'have_downloaded size : {len(have_downloaded)}')
print(f'rest to process size : {len(datas)}')
model_name = f"/share/minghao/Models/{args.model}"
# Initialize the tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
prompts = []
data_inps = []
for data in datas:
this_prompt = random.choice(prompt_generate)
dense_caption = data['dense_caption']
background_caption = data['background_caption']
main_object_caption = data['main_object_caption']
system_prompt, user_prompt = this_prompt(dense_caption, background_caption, main_object_caption)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
prompts.append(prompt)
data['qa_prompt'] = prompt
data_inps.append(data)
print(f'Total size: {len(prompts)}')
print(f'Sample show: {prompts[0]}')
# start = time.time()
# dp_master_ip = "127.0.0.1"
# dp_master_port = get_open_port()
# procs = []
# GPUs_per_dp_rank = args.GPUs_per_dp_rank
# DP_size = args.DP_size
# for i in range(DP_size):
# proc = Process(target=main,
# args=(DP_size, i, dp_master_ip, dp_master_port,
# GPUs_per_dp_rank, data_inps))
# proc.start()
# procs.append(proc)
# print(f'OOM了没有?')
# exit_code = 0
# for proc in procs:
# proc.join()
# if proc.exitcode:
# exit_code = proc.exitcode
# end = time.time()
# print(f'Total size: {len(prompts)}', f'Total time cost: {end-start}s')
# exit(exit_code)