Spaces:
Sleeping
Sleeping
import gradio as gr | |
import spaces | |
import os | |
import gc | |
import random | |
import warnings | |
warnings.filterwarnings("ignore") | |
import numpy as np | |
import pandas as pd | |
pd.set_option("display.max_rows", 500) | |
pd.set_option("display.max_columns", 500) | |
pd.set_option("display.width", 1000) | |
from tqdm.auto import tqdm | |
import torch | |
import torch.nn as nn | |
import tokenizers | |
import transformers | |
print(f"tokenizers.__version__: {tokenizers.__version__}") | |
print(f"transformers.__version__: {transformers.__version__}") | |
print(f"torch.__version__: {torch.__version__}") | |
print(f"torch cuda version: {torch.version.cuda}") | |
from transformers import AutoTokenizer, AutoConfig | |
from transformers import BitsAndBytesConfig, AutoModelForCausalLM, MistralForCausalLM | |
from peft import LoraConfig, get_peft_model | |
title = "H2O AI Predict the LLM" | |
description =" The objective of this [competition](https://www.kaggle.com/competitions/h2oai-predict-the-llm) was to \ | |
detect which out of 7 possible LLM models produced a particular response. \n\n\ | |
This demo is utilizing finetuned HuggingFaceH4/zephyr-7b-beta model for a multiclass classification task. \n\n \ | |
We ranked 3rd out of more than 100 participants and our team's solution is [here](https://www.kaggle.com/competitions/h2oai-predict-the-llm/discussion/453728)" | |
title = title + "\n" + description | |
#Theme from - https://huggingface.co/spaces/trl-lib/stack-llama/blob/main/app.py | |
theme = gr.themes.Monochrome( | |
primary_hue="indigo", | |
secondary_hue="blue", | |
neutral_hue="slate", | |
radius_size=gr.themes.sizes.radius_sm, | |
font=[gr.themes.GoogleFont("Open Sans"), "ui-sans-serif", "system-ui", "sans-serif"], | |
) | |
### Load the model | |
class CFG: | |
num_workers = os.cpu_count() | |
llm_backbone = "HuggingFaceH4/zephyr-7b-beta" #"save_pretrained_model/zephyr-7b-beta" | |
tokenizer_path = "HuggingFaceH4/zephyr-7b-beta" | |
tokenizer = AutoTokenizer.from_pretrained( | |
tokenizer_path, add_prefix_space=False, use_fast=True, trust_remote_code=True, add_eos_token=True | |
) | |
batch_size = 1 | |
max_len = 650 | |
seed = 42 | |
num_labels = 7 | |
lora = True | |
lora_r = 4 | |
lora_alpha = 16 | |
lora_dropout = 0.05 | |
lora_target_modules = "" | |
gradient_checkpointing = True | |
class CustomModel(nn.Module): | |
""" | |
Model for causal language modeling problem type. | |
""" | |
def __init__(self): | |
super().__init__() | |
self.backbone_config = AutoConfig.from_pretrained( | |
CFG.llm_backbone, trust_remote_code=True | |
) | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
bnb_4bit_quant_type="nf4", | |
) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
CFG.llm_backbone, | |
config=self.backbone_config, | |
quantization_config=quantization_config, | |
) | |
if CFG.lora: | |
target_modules = [] | |
for name, module in self.model.named_modules(): | |
if ( | |
isinstance(module, (torch.nn.Linear, torch.nn.Conv1d)) | |
and "head" not in name | |
): | |
name = name.split(".")[-1] | |
if name not in target_modules: | |
target_modules.append(name) | |
lora_config = LoraConfig( | |
r=CFG.lora_r, | |
lora_alpha=CFG.lora_alpha, | |
target_modules=target_modules, | |
lora_dropout=CFG.lora_dropout, | |
bias="none", | |
task_type="CAUSAL_LM", | |
) | |
if CFG.gradient_checkpointing: | |
self.model.enable_input_require_grads() | |
self.model = get_peft_model(self.model, lora_config) | |
self.model.print_trainable_parameters() | |
self.classification_head = nn.Linear( | |
self.backbone_config.vocab_size, CFG.num_labels, bias=False | |
) | |
self._init_weights(self.classification_head) | |
def _init_weights(self, module): | |
if isinstance(module, nn.Linear): | |
module.weight.data.normal_(mean=0.0, std=self.backbone_config.initializer_range) | |
if module.bias is not None: | |
module.bias.data.zero_() | |
elif isinstance(module, nn.Embedding): | |
module.weight.data.normal_(mean=0.0, std=self.backbone_config.initializer_range) | |
if module.padding_idx is not None: | |
module.weight.data[module.padding_idx].zero_() | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
def forward( | |
self, | |
batch | |
): | |
# disable cache if gradient checkpointing is enabled | |
if CFG.gradient_checkpointing: | |
self.model.config.use_cache = False | |
self.model.config.pretraining_tp = 1 | |
output = self.model( | |
input_ids=batch["input_ids"], | |
attention_mask=batch["attention_mask"], | |
) | |
output.logits = self.classification_head(output[0][:, -1].float()) | |
# enable cache again if gradient checkpointing is enabled | |
if CFG.gradient_checkpointing: | |
self.model.config.use_cache = True | |
return output.logits | |
model = CustomModel() | |
### End Load the model | |
def do_inference(full_text): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model_paths = [ | |
'model_finetuned/HuggingFaceH4-zephyr-7b-beta_fold0_best.pth'] | |
def prepare_input(cfg, text): | |
inputs = cfg.tokenizer.encode_plus( | |
text, | |
return_tensors=None, | |
add_special_tokens=True, | |
max_length=CFG.max_len, | |
pad_to_max_length=True, | |
truncation="longest_first", | |
) | |
for k, v in inputs.items(): | |
inputs[k] = torch.tensor(v, dtype=torch.long) | |
return inputs | |
state = torch.load(model_paths[0], map_location=torch.device("cpu")) | |
model.load_state_dict(state["model"] ,strict=False) | |
model.eval() | |
model.to(device) | |
inputs = prepare_input(CFG, full_text) | |
inputs["input_ids"] = inputs["input_ids"].reshape(1, -1).to(device) | |
inputs["attention_mask"] = inputs["attention_mask"].reshape(1, -1).to(device) | |
with torch.no_grad(): | |
with torch.cuda.amp.autocast( | |
enabled=True, dtype=torch.float16, cache_enabled=True | |
): | |
y_preds = model(inputs) | |
y_preds = y_preds.detach().to("cpu").numpy().astype(np.float32) | |
y_preds= torch.softmax(torch.tensor(y_preds), 1).numpy() | |
result = np.argmax(y_preds) | |
if result == 0: | |
return "0. llama2-70b-chat" | |
elif result == 1: | |
return "1. wizardLM-13b" | |
elif result == 2: | |
return "2. llama2-13b-chat" | |
elif result == 3: | |
return "3. wizardLM-70b" | |
elif result == 4: | |
return "4. llama2-7b-chat" | |
elif result == 5: | |
return "5. tinyllama-1b-chat" | |
elif result == 6: | |
return "6. mistral-7b-openorca" | |
else: | |
return "Error" | |
def do_submit(question, response): | |
full_text = question + " " + response | |
result = do_inference(full_text) | |
return result | |
def greet(): | |
pass | |
with gr.Blocks(title=title) as demo: # theme=theme | |
sample_examples = pd.read_csv('sample_examples.csv') | |
example_list = sample_examples[['Question','Response','target']].sample(2).values.tolist() | |
gr.Markdown(f"## {title}") | |
with gr.Row(): | |
question_text = gr.Textbox(lines=2, placeholder="Question:", label="") | |
response_text = gr.Textbox(lines=2, placeholder="Response:", label="") | |
target_text = gr.Textbox(lines=1, placeholder="Target:", label="", interactive=False , visible=False) | |
llm_num = gr.Textbox(value="", label="LLM #") | |
with gr.Row(): | |
sub_btn = gr.Button("Submit") | |
sub_btn.click(fn=do_submit, inputs=[question_text, response_text], outputs=[llm_num]) | |
gr.Markdown("## Sample Inputs:") | |
gr.Examples( | |
example_list, | |
[question_text,response_text,target_text], | |
) | |
demo.launch(greet) |