"""Perform inference of one model on a dataset and measure time and energy.""" from __future__ import annotations import os import json import copy import atexit from typing import Generator, Literal import tyro import torch import rich from rich.table import Table from fastchat.serve.inference import generate_stream from fastchat.model.model_adapter import load_model, get_conversation_template from zeus.monitor import ZeusMonitor SYSTEM_PROMPTS = { "chat": ( "A chat between a human user (prompter) and an artificial intelligence (AI) assistant. " "The assistant gives helpful, detailed, and polite answers to the user's questions. " ), "chat-concise": ( "A chat between a human user (prompter) and an artificial intelligence (AI) assistant. " "The assistant gives helpful, detailed, and polite answers to the user's questions. " "The assistant's answers are very concise. " ), "instruct": ( "Below is an instruction that describes a task. " "Write a response that appropriately completes the request. " ), "instruct-concise": ( "Below is an instruction that describes a task. " "Write a response that appropriately completes the request. " "The response should be very concise. " ), } def main( model_path: str, input_file: str = "sharegpt/sg_90k_part1_html_cleaned_lang_first_sampled.json", output_dir: str = "data", device_index: int = 0, task: Literal[tuple(SYSTEM_PROMPTS)] = "chat", # type: ignore load_8bit: bool = False, temperature: float = 0.7, repitition_penalty: float = 1.0, max_new_tokens: int = 512, ) -> None: """Run benchmarking for one model on the entire input file. Args: model_path: Path to or Huggingface Hub Id of the model. input_file: Path to the input JSON file. Assumed to be our cleaned ShareGPT data. (Default: "sharegpt/sg_90k_part1_html_cleaned_lang_first_sampled.json") output_dir: Path to the output directory. (Default: "data") device_index: Index of the GPU to use for inference. (Default: 0) task: Type of task to perform inference on. (Default: "chat") load_8bit: Whether to load the model in 8-bit mode. (Default: False) temperature: Temperature to use for sampling. (Default: 0.7) repitition_penalty: Repitition penalty to use for the model. (Default: 1.0) max_new_tokens: Maximum numbers of tokens to generate, ignoring the prompt. (Default: 512) """ # NOTE(JW): ChatGLM is implemented as a special case in FastChat inference. # Also, it's primarily a model that's fine-tuned for Chinese, so it doesn't # make sense to prompt it in English and talk about its verbosity. if "chatglm" in model_path.lower(): raise ValueError("ChatGLM is not supported.") # Get Rich Console instance. console = rich.get_console() # Print out what we're about to do. if model_path.endswith("/"): model_path = model_path[:-1] model_name_cleaned = "--".join(model_path.split("/")[-2:]) output_dir = f"{output_dir}/{task}/{model_name_cleaned}" output_csv_path = f"{output_dir}/benchmark.json" config_json_path = f"{output_dir}/config.json" table = Table(title="Benchmark") table.add_column("Configuration") table.add_column("Value") table.add_row("Model", f"{model_name_cleaned} (path: {model_path})") table.add_row("Input", input_file) table.add_row("Device", f"cuda:{device_index}") table.add_row("Task", task) table.add_row("8-bit", str(load_8bit)) table.add_row("Temperature", f"{temperature:.2f}") table.add_row("Repitition Penalty", f"{repitition_penalty:.2f}") table.add_row("Max New Tokens", str(max_new_tokens)) table.add_row("Output CSV", output_csv_path) table.add_row("Config JSON", config_json_path) console.print(table) # Set the device. torch.cuda.set_device(f"cuda:{device_index}") # Load the model (Huggingface PyTorch) and tokenizer (Huggingface). model, tokenizer = load_model( model_path=model_path, device="cuda", num_gpus=1, max_gpu_memory=None, load_8bit=load_8bit, cpu_offloading=False, gptq_config=None, debug=False, ) # Chats are accumulated in a conversation helper object. conv_base = get_conversation_template(model_path) # Standardize the system prompt for every model. conv_base.system = SYSTEM_PROMPTS[task] conv_base.messages = [] conv_base.offset = 0 gen_params = { "model": model_path, "prompt": "EMPTY", "temperature": temperature, "repitition_penalty": repitition_penalty, "max_new_tokens": max_new_tokens, "stop": conv_base.stop_str, "stop_token_ids": conv_base.stop_token_ids, "echo": False, } monitor = ZeusMonitor(gpu_indices=[torch.cuda.current_device()]) # Output files. # Leave only the last two path components and replace slashes with double dashes. os.makedirs(output_dir, exist_ok=True) output_json = open(output_csv_path, "w") output_json.write("[\n") output_json.flush() # Conclude the JSON file format with a closing bracket. Using `atexit` will # handle all cases of the program exiting, including Ctrl-C and errors. atexit.register(lambda: output_json.write("\n]\n")) # Dump the configuration to a JSON file. with open(config_json_path, "w") as config_json: json.dump( { "model_path": model_path, "input_file": input_file, "device_index": device_index, "task": task, "load_8bit": load_8bit, "temperature": temperature, "repitition_penalty": repitition_penalty, "max_new_tokens": max_new_tokens, }, config_json, indent=4, ) config_json.write("\n") def dataloader(input_file: str) -> Generator[tuple[bool, str], None, None]: """Yields a tuple of whether this is a warmup run and the input prompt.""" for _ in range(3): yield True, "Say something long and random. I don't care about the content." for item in json.load(open(input_file, "r")): input_prompt = item["conversations"][0]["value"] yield False, input_prompt # Warm up the GPU with some random prompts. # Forward through all the prompts. is_first = True for is_warmup, input_prompt in dataloader(input_file): # Construct the input prompt. conv = copy.deepcopy(conv_base) conv.append_message(conv.roles[0], input_prompt) conv.append_message(conv.roles[1], "") prompt = conv.get_prompt() gen_params["prompt"] = prompt # Print input prompt. console.print(f"\n[u cyan]{'Warmup ' if is_warmup else ''}Prompt[/u cyan]:") console.print(prompt.strip() + "\n", markup=False) # Generate the ouptut from the model. output_stream = generate_stream(model, tokenizer, gen_params, device="cuda") output = {} ################################################# # Inference and measurement zone! ################################################# monitor.begin_window("inference") for output in output_stream: pass measurements = monitor.end_window("inference") ################################################# # Record numbers. output_text = output["text"] if not is_warmup: response_length = len(tokenizer.encode(output_text)) # number of tokens latency = measurements.time throughput = response_length / latency energy = measurements.total_energy output = { "model": model_name_cleaned, "throughput": throughput, "response_length": response_length, "latency": latency, "energy": energy, "input": prompt.strip(), "output": output_text.strip(), } output_str = json.dumps(output, indent=4) if not is_warmup: if not is_first: output_json.write(",\n" + output_str) else: is_first = False output_json.write(output_str) output_json.flush() # Print the response. console.print(f"\n[u cyan]{'Warmup ' if is_warmup else ''}Response[/u cyan]:") console.print(output_text.strip() + "\n", markup=False) # Print measurement. console.print(measurements) if __name__ == "__main__": tyro.cli(main)