import os import pandas as pd import numpy as np import argparse import datasets import torch import re from thefuzz import process from typing import List from tqdm import tqdm from transformers.trainer_utils import set_seed ''' wget https://people.eecs.berkeley.edu/~hendrycks/data.tar mkdir data/mmlu mv data.tar data/mmlu cd data/mmlu; tar xf data.tar cd ../../ pip install thefuzz python eval/evaluate_chat_mmlu.py -d data/mmlu/data/ ''' from typing import Tuple, List, Union, Iterable import numpy as np import torch import torch.nn.functional as F from transformers import PreTrainedTokenizer from transformers import logging from transformers.generation import LogitsProcessor from typing import TYPE_CHECKING, Optional, Tuple, Union, Callable, List HistoryType = List[Tuple[str, str]] TokensType = List[int] BatchTokensType = List[List[int]] def make_context( tokenizer: PreTrainedTokenizer, query: str, history: List[Tuple[str, str]] = None, system: str = "", max_window_size: int = 6144, chat_format: str = "chatml", ): if history is None: history = [] im_start, im_end = "<|im_start|>", "<|im_end|>" im_start_tokens = [tokenizer.im_start_id] im_end_tokens = [tokenizer.im_end_id] nl_tokens = tokenizer.encode("\n") def _tokenize_str(role, content): return f"{role}\n{content}", tokenizer.encode( role ) + nl_tokens + tokenizer.encode(content) system_text, system_tokens_part = _tokenize_str("system", system) system_tokens = im_start_tokens + system_tokens_part + im_end_tokens raw_text = "" context_tokens = [] for turn_query, turn_response in reversed(history): query_text, query_tokens_part = _tokenize_str("user", turn_query) query_tokens = im_start_tokens + query_tokens_part + im_end_tokens response_text, response_tokens_part = _tokenize_str( "assistant", turn_response ) response_tokens = im_start_tokens + response_tokens_part + im_end_tokens next_context_tokens = nl_tokens + query_tokens + nl_tokens + response_tokens prev_chat = ( f"\n{im_start}{query_text}{im_end}\n{im_start}{response_text}{im_end}" ) current_context_size = ( len(system_tokens) + len(next_context_tokens) + len(context_tokens) ) if current_context_size < max_window_size: context_tokens = next_context_tokens + context_tokens raw_text = prev_chat + raw_text else: break context_tokens = system_tokens + context_tokens raw_text = f"{im_start}{system_text}{im_end}" + raw_text context_tokens += ( nl_tokens + im_start_tokens + _tokenize_str("user", query)[1] + im_end_tokens + nl_tokens + im_start_tokens + tokenizer.encode("assistant") + nl_tokens ) raw_text += f"\n{im_start}user\n{query}{im_end}\n{im_start}assistant\n" return raw_text, context_tokens def chat( model, tokenizer: PreTrainedTokenizer, query: str, history: Optional[HistoryType], system: str = "You are a helpful assistant.", append_history: bool = True ) -> Tuple[str, HistoryType]: if history is None: history = [] raw_text, context_tokens = make_context( tokenizer, query, history=history, system=system, max_window_size=6144, chat_format = "chatml", ) stop_words_ids = [[tokenizer.im_end_id], [tokenizer.im_start_id]] input_ids = torch.tensor([context_tokens]).cuda() outputs = model.generate( input_ids, stop_words_ids = stop_words_ids, return_dict_in_generate = False, ) response = decode_tokens( outputs[0], tokenizer, raw_text_len=len(raw_text), context_length=len(context_tokens), chat_format='chatml', verbose=False, ) if append_history: history.append((query, response)) return response, history def decode_tokens( tokens: Union[torch.LongTensor, TokensType], tokenizer: PreTrainedTokenizer, raw_text_len: int, context_length: int, chat_format: str = "chatml", verbose: bool = False, return_end_reason: bool = False, ) -> str: if torch.is_tensor(tokens): tokens = tokens.cpu().numpy().tolist() return _decode_chatml( tokens, stop_words=[], eod_token_ids=[tokenizer.im_start_id, tokenizer.im_end_id], tokenizer=tokenizer, raw_text_len=raw_text_len, context_length=context_length, verbose=verbose, return_end_reason=return_end_reason, ) def _decode_chatml( tokens: List[int], *, stop_words: List[str], eod_token_ids: List[int], tokenizer: PreTrainedTokenizer, raw_text_len: int, context_length: int, verbose: bool = False, return_end_reason: bool = False, chat_format = "chatml", ): end_reason = f"Gen length {len(tokens)}" eod_token_idx = context_length for eod_token_idx in range(context_length, len(tokens)): if tokens[eod_token_idx] in eod_token_ids: end_reason = f"Gen {tokenizer.decode([tokens[eod_token_idx]])!r}" break trim_decode_tokens = tokenizer.decode(tokens[:eod_token_idx])[raw_text_len:] if verbose: print("\nRaw Generate w/o EOD:", tokenizer.decode(tokens)[raw_text_len:]) print("\nRaw Generate:", trim_decode_tokens) print("\nEnd Reason:", end_reason) for stop_word in stop_words: trim_decode_tokens = trim_decode_tokens.replace(stop_word, "").strip() trim_decode_tokens = trim_decode_tokens.strip() if verbose: print("\nGenerate:", trim_decode_tokens) if return_end_reason: return trim_decode_tokens, end_reason else: return trim_decode_tokens def load_models_tokenizer(args): from transformers import AutoModelForCausalLM, AutoTokenizer from transformers.generation import GenerationConfig tokenizer = AutoTokenizer.from_pretrained(args.checkpoint_path, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained(args.checkpoint_path, device_map="auto", trust_remote_code=True).eval() model.generation_config = GenerationConfig.from_pretrained(args.checkpoint_path, trust_remote_code=True) model.generation_config.do_sample = False # use greedy decoding return model, tokenizer def format_example(line): example = 'The following is a multiple-choice question. Please choose the most suitable one among A, B, C and D as the answer to this question.\n\n' + line['question'] + "\n" for choice in choices: example += f'{choice}. {line[f"{choice}"]}\n' return example def process_before_extraction(gen, choice_dict): # replace the choice by letter in the generated sentence # from longest one to shortest one for key, val in sorted(choice_dict.items(), key=lambda x: len(x[1]), reverse=True): pattern = re.compile(re.escape(val.rstrip(".")), re.IGNORECASE) gen = pattern.sub(key, gen) return gen def extract_choice(gen, choice_list): # answer is A | choice is A | choose A res = re.search(r"(?:(?:[Cc]hoose)|(?:(?:[Aa]nswer|[Cc]hoice)(?![^ABCD]{0,20}?(?:n't|not))[^ABCD]{0,10}?\b(?:|is|:|be))\b)[^ABCD]{0,20}?\b(A|B|C|D)\b", gen) # A is correct | A is right if res is None: res = re.search(r"\b(A|B|C|D)\b(?![^ABCD]{0,8}?(?:n't|not)[^ABCD]{0,5}?(?:correct|right))[^ABCD]{0,10}?\b(?:correct|right)\b", gen) # straight answer: A if res is None: res = re.search(r"^(A|B|C|D)(?:\.|,|:|$)", gen) # simply extract the first appearred letter if res is None: res = re.search(r"(?