import gradio as gr import torch from transformers import GPT2LMHeadModel, T5Tokenizer model_name = "akiFQC/japanese-dialogpt-small-aozora" tokenizer = T5Tokenizer.from_pretrained(model_name) tokenizer.do_lower_case = True # due to some bug of tokenizer config loading model = GPT2LMHeadModel.from_pretrained(model_name) class DialogGPT: def __init__(self, tokenizer, model, n_candidate=4, param_lambda=0.10): self.tokenizer = tokenizer self.model = model self.model.eval() self.n_candidate = n_candidate self.param_lambda = param_lambda self.param_gamma: int = 2 def _calc_single_scores(self, token_ids): with torch.inference_mode(): candidate_token_ids = token_ids[:, :-1] label_token_ids = token_ids[:, 1:] outputs = self.model(candidate_token_ids, labels=label_token_ids) _, logits = outputs[:2] logits = torch.log_softmax(logits, dim=-1) logit_at_target = logits.gather( dim=-1, index=candidate_token_ids.unsqueeze(-1) ).squeeze(-1) # mask out pad token positio mask_at_pad = candidate_token_ids == self.tokenizer.pad_token_id # log_likelihood (b, l) log_likelihood = logit_at_target log_likelihood.masked_fill_(mask_at_pad, 0.0) log_likelihood_per_candidate = log_likelihood[:, self.param_gamma:].sum(dim=1) # normalize by length # log_likelihood_per_candidate = log_likelihood_per_candidate / (candidate_token_ids.shape[1] - mask_at_pad.sum(dim=1)) return log_likelihood_per_candidate def _calc_scores(self, sequences, scores, input_ids=None): transition_scores = model.compute_transition_scores( sequences, scores, normalize_logits=True ) if input_ids is None: input_length = 0 else: input_length = input_ids.shape[1] generated_tokens = sequences[:, input_length:] # n x l assert ( generated_tokens.shape[1] == transition_scores.shape[1] ), f"{generated_tokens.shape[1]} != {transition_scores.shape[1]}" # print(transition_scores.shape) # print(generated_tokens) transition_scores.masked_fill_( generated_tokens == self.tokenizer.pad_token_id, 0.0 ) transition_scores = transition_scores.sum(dim=1) # print(transition_scores) return transition_scores def reply(self, reply, history) -> str: chat_history_ids = torch.LongTensor(history).unsqueeze(0) # encode the new user input, add the eos_token and return a tensor in Pytorch new_user_input_ids = self.tokenizer.encode( reply + self.tokenizer.eos_token, return_tensors="pt" ) # append the new user input tokens to the chat history bot_input_ids = ( torch.cat([chat_history_ids, new_user_input_ids], dim=-1) if chat_history_ids is not None else new_user_input_ids ) # generated a response while limiting the total chat history to 1000 tokens, with torch.inference_mode(): output = model.generate( bot_input_ids, pad_token_id=self.tokenizer.pad_token_id, do_sample=True, top_p=0.93, temperature=0.5, repetition_penalty=1.17, max_time=10, num_return_sequences=self.n_candidate, max_length=512, min_length=4, forced_eos_token_id=self.tokenizer.pad_token_id, return_dict_in_generate=True, output_scores=True, min_new_tokens=2, ) # score of each candidate scores_condition_s2t = self._calc_scores( sequences=output.sequences, scores=output.scores, input_ids=bot_input_ids ) new_token_ids = output.sequences[:, bot_input_ids.shape[-1] :] single_scores = self._calc_single_scores(new_token_ids) * self.param_lambda total_scores = scores_condition_s2t - single_scores id_selected = torch.argmax(total_scores) chat_history_ids = output.sequences[id_selected].unsqueeze( 0 ) # update chat history # remove pad token chat_history_ids = chat_history_ids[ :, chat_history_ids[0] != self.tokenizer.pad_token_id ] replay_string = tokenizer.decode( chat_history_ids[:, :][0], skip_special_tokens=False ) return replay_string, chat_history_ids[0].tolist() bot = DialogGPT( tokenizer, model, ) def predict(input, history=[]): replay_string, history = bot.reply(input, history) response = replay_string.split(tokenizer.eos_token) response = [ (response[i], response[i + 1]) for i in range(0, len(response) - 1, 2) ] # convert to tuples of list return response, history with gr.Blocks() as demo: chatbot = gr.Chatbot() state = gr.State([]) with gr.Row(): txt = gr.Textbox( show_label=False, placeholder="Enter text and press enter" ).style(container=False) txt.submit(predict, [txt, state], [chatbot, state]) demo.launch()