import os.path import time from pathlib import Path from typing import Callable, Optional, Tuple import pandas as pd from datasets import Dataset from optimum.onnxruntime import ( ORTModelForSequenceClassification, ORTOptimizer, ORTQuantizer, ) from optimum.onnxruntime.configuration import ( AutoCalibrationConfig, AutoOptimizationConfig, AutoQuantizationConfig, ) from optimum.pipelines import pipeline as opt_pipeline from pynvml import nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, nvmlInit from sklearn.metrics import roc_auc_score from transformers import AutoTokenizer, PreTrainedModel, PreTrainedTokenizer, pipeline from transformers.pipelines.base import KeyDataset from detoxify.detoxify import load_checkpoint def get_gpu_utilization() -> int: nvmlInit() handle = nvmlDeviceGetHandleByIndex(0) info = nvmlDeviceGetMemoryInfo(handle) return info.used // 1024**2 # memory in MB def load_data(base_path: Path, nrows: Optional[int] = None) -> pd.DataFrame: labels_path = base_path / "test_labels.csv" test_path = base_path / "test.csv" labels_df = pd.read_csv(labels_path, index_col=0, nrows=nrows) test_df = pd.read_csv(test_path, index_col=0, nrows=nrows) test_df["label"] = labels_df return test_df def get_toxicity(result): return list(filter(lambda r: r["label"] == "toxicity", result))[0]["score"] def evaluate_devices(data_path: Path, evaluate_model_fn: Callable, **kwargs): small_df = load_data(data_path, nrows=1000) cpu_eval = evaluate_model_fn("cpu", small_df, **kwargs) big_df = load_data(data_path) gpu_eval = evaluate_model_fn("cuda:0", big_df, **kwargs) return { "scores": gpu_eval["scores"], "samples_per_second_cpu": len(small_df) / cpu_eval["time_seconds"], "samples_per_second_gpu": len(big_df) / gpu_eval["time_seconds"], "gpu_memory_mb": gpu_eval["gpu_memory_mb"], } def evaluate_pipeline(pipe, df): results = pipe( KeyDataset(Dataset.from_pandas(df), "content"), top_k=None, batch_size=4, padding="longest", truncation=True, ) t1 = time.time() toxicity_pred = pd.Series(map(get_toxicity, results), index=df.index) t2 = time.time() scores = { "all": roc_auc_score(df.label, toxicity_pred), } languages = ["it", "fr", "ru", "pt", "es", "tr"] for lang in languages: idx = df.lang == lang scores[lang] = roc_auc_score(df[idx].label, toxicity_pred[idx]) return { "scores": scores, "time_seconds": t2 - t1, "gpu_memory_mb": get_gpu_utilization(), } def load_original_model(device: str) -> Tuple[PreTrainedModel, PreTrainedTokenizer]: model, tokenizer, class_names = load_checkpoint( model_type="multilingual", device=device ) identity_classes = [ "male", "female", "homosexual_gay_or_lesbian", "christian", "jewish", "muslim", "black", "white", "psychiatric_or_mental_illness", ] model.config.id2label = {n: c for n, c in enumerate(class_names + identity_classes)} model.config.label2id = {c: n for n, c in enumerate(class_names + identity_classes)} return model, tokenizer def evaluate_original_model(device: str, test_df: pd.DataFrame): model, tokenizer = load_original_model(device) pipe = pipeline( model=model, task="text-classification", tokenizer=tokenizer, function_to_apply="sigmoid", device=device, ) return evaluate_pipeline(pipe, test_df) def save_original_model(base_path: Path = Path(".")): model, tokenizer = load_original_model("cpu") pipe = pipeline( model=model, task="text-classification", tokenizer=tokenizer, function_to_apply="sigmoid", ) pipe.save_pretrained(base_path) def evaluate_ort_model(device: str, test_df: pd.DataFrame, base_path: Path = Path(".")): model = ORTModelForSequenceClassification.from_pretrained(base_path, export=True) tokenizer = AutoTokenizer.from_pretrained(base_path, device=device) pipe = opt_pipeline( model=model, task="text-classification", tokenizer=tokenizer, function_to_apply="sigmoid", device=device, accelerator="ort", ) return evaluate_pipeline(pipe, test_df) def evaluate_ort_optimize_model( device: str, test_df: pd.DataFrame, base_path: Path = Path(".") ): tokenizer = AutoTokenizer.from_pretrained(base_path, device=device) if not os.path.exists(base_path / "model_optimized.onnx"): model = ORTModelForSequenceClassification.from_pretrained( base_path, export=True ) # oconfig = AutoOptimizationConfig.O1(fp16=True) oconfig = AutoOptimizationConfig.O4() optimizer = ORTOptimizer.from_pretrained(model) optimizer.optimize( save_dir=base_path, optimization_config=oconfig, ) model = ORTModelForSequenceClassification.from_pretrained( base_path, file_name="model_optimized.onnx" ) pipe = opt_pipeline( model=model, task="text-classification", function_to_apply="sigmoid", device=device, accelerator="ort", tokenizer=tokenizer, ) return evaluate_pipeline(pipe, test_df) def evaluate_ort_quantize_model( device: str, test_df: pd.DataFrame, base_path: Path = Path("."), overwrite: bool = False, ): tokenizer = AutoTokenizer.from_pretrained(base_path, device=device) if overwrite or not os.path.exists(base_path / "model_quantized.onnx"): model = ORTModelForSequenceClassification.from_pretrained( base_path, export=True ) qconfig = AutoQuantizationConfig.avx2(is_static=True, per_channel=False) quantizer = ORTQuantizer.from_pretrained(model) def preprocess_fn(ex): return tokenizer(ex["content"]) # Calibrate based on the dataset calibration_dataset = ( Dataset.from_pandas(test_df) .map(preprocess_fn) .select_columns(["input_ids", "attention_mask"]) ) calibration_config = AutoCalibrationConfig.minmax(calibration_dataset) ranges = quantizer.fit( dataset=calibration_dataset, calibration_config=calibration_config, operators_to_quantize=qconfig.operators_to_quantize, ) quantizer.quantize( save_dir=base_path, quantization_config=qconfig, calibration_tensors_range=ranges, ) model = ORTModelForSequenceClassification.from_pretrained( base_path, file_name="model_quantized.onnx", foo="bar", ) pipe = opt_pipeline( model=model, task="text-classification", function_to_apply="sigmoid", device=device, accelerator="ort", tokenizer=tokenizer, ) return evaluate_pipeline(pipe, test_df) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "data_path", type=str, help="Path to jigsaw multilingual toxic comment data. " 'For example: "jigsaw_data/jigsaw-multilingual-toxic-comment-classification"', ) parser.add_argument( "--models_path", type=str, default=".", help="Path to model weights directory (root of the repo)", ) parser.add_argument( "model", type=str, help="Model to evaluate (original, ort, optimized, quantized)." ) args = parser.parse_args() data = Path(args.data_path) models_p = Path(args.models_path) if args.model == "original": print(evaluate_devices(data, evaluate_original_model)) elif args.model == "ort": print(evaluate_devices(data, evaluate_ort_model, base_path=models_p)) elif args.model == "optimized": print(evaluate_devices(data, evaluate_ort_optimize_model, base_path=models_p)) elif args.model == "quantized": print(evaluate_devices(data, evaluate_ort_quantize_model, base_path=models_p)) else: raise ValueError(f"Invalid model received: {args.model!r}")