p4vv37's picture
Evaluating code quality before comment generation.
9cad313
import gradio as gr
import requests
from torch import nn
from torch.nn import CrossEntropyLoss
from transformers import AutoTokenizer, T5ForConditionalGeneration, AutoModelForSeq2SeqLM, T5Config
import torch
MAX_SOURCE_LENGTH = 512
class ReviewerModel(T5ForConditionalGeneration):
def __init__(self, config):
super().__init__(config)
self.cls_head = nn.Linear(self.config.d_model, 2, bias=True)
self.init()
def init(self):
nn.init.xavier_uniform_(self.lm_head.weight)
factor = self.config.initializer_factor
self.cls_head.weight.data.normal_(mean=0.0, \
std=factor * ((self.config.d_model) ** -0.5))
self.cls_head.bias.data.zero_()
def forward(
self, *argv, **kwargs
):
r"""
Doc from Huggingface transformers:
labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`):
Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[-100, 0, ...,
config.vocab_size - 1]`. All labels set to ``-100`` are ignored (masked), the loss is only computed for
labels in ``[0, ..., config.vocab_size]``
Returns:
Examples::
>>> from transformers import T5Tokenizer, T5ForConditionalGeneration
>>> tokenizer = T5Tokenizer.from_pretrained('t5-small')
>>> model = T5ForConditionalGeneration.from_pretrained('t5-small')
>>> # training
>>> input_ids = tokenizer('The <extra_id_0> walks in <extra_id_1> park', return_tensors='pt').input_ids
>>> labels = tokenizer('<extra_id_0> cute dog <extra_id_1> the <extra_id_2>', return_tensors='pt').input_ids
>>> outputs = model(input_ids=input_ids, labels=labels)
>>> loss = outputs.loss
>>> logits = outputs.logits
>>> # inference
>>> input_ids = tokenizer("summarize: studies have shown that owning a dog is good for you", return_tensors="pt").input_ids # Batch size 1
>>> outputs = model.generate(input_ids)
>>> print(tokenizer.decode(outputs[0], skip_special_tokens=True))
>>> # studies have shown that owning a dog is good for you.
"""
if "cls" in kwargs:
assert (
"input_ids" in kwargs and \
"labels" in kwargs and \
"attention_mask" in kwargs
)
return self.cls(
input_ids=kwargs["input_ids"],
labels=kwargs["labels"],
attention_mask=kwargs["attention_mask"],
)
if "input_labels" in kwargs:
assert (
"input_ids" in kwargs and \
"input_labels" in kwargs and \
"decoder_input_ids" in kwargs and \
"attention_mask" in kwargs and \
"decoder_attention_mask" in kwargs
), "Please give these arg keys."
input_ids = kwargs["input_ids"]
input_labels = kwargs["input_labels"]
decoder_input_ids = kwargs["decoder_input_ids"]
attention_mask = kwargs["attention_mask"]
decoder_attention_mask = kwargs["decoder_attention_mask"]
if "encoder_loss" not in kwargs:
encoder_loss = True
else:
encoder_loss = kwargs["encoder_loss"]
return self.review_forward(input_ids, input_labels, decoder_input_ids, attention_mask,
decoder_attention_mask, encoder_loss)
return super().forward(*argv, **kwargs)
def cls(
self,
input_ids,
labels,
attention_mask,
):
encoder_outputs = self.encoder( \
input_ids=input_ids,
attention_mask=attention_mask,
output_attentions=False,
return_dict=False
)
hidden_states = encoder_outputs[0]
first_hidden = hidden_states[:, 0, :]
first_hidden = nn.Dropout(0.3)(first_hidden)
logits = self.cls_head(first_hidden)
loss_fct = CrossEntropyLoss()
if labels != None:
loss = loss_fct(logits, labels)
return loss
return logits
def review_forward(
self,
input_ids,
input_labels,
decoder_input_ids,
attention_mask,
decoder_attention_mask,
encoder_loss=True
):
encoder_outputs = self.encoder( \
input_ids=input_ids,
attention_mask=attention_mask,
output_attentions=False,
return_dict=False
)
hidden_states = encoder_outputs[0]
decoder_inputs = self._shift_right(decoder_input_ids)
# Decode
decoder_outputs = self.decoder(
input_ids=decoder_inputs,
attention_mask=decoder_attention_mask,
encoder_hidden_states=hidden_states,
encoder_attention_mask=attention_mask,
output_attentions=False,
return_dict=False
)
sequence_output = decoder_outputs[0]
if self.config.tie_word_embeddings: # this is True default
sequence_output = sequence_output * (self.model_dim ** -0.5)
if encoder_loss:
# print(self.encoder.get_input_embeddings().weight.shape)
cls_logits = nn.functional.linear(hidden_states, self.encoder.get_input_embeddings().weight)
# cls_logits = self.cls_head(hidden_states)
lm_logits = self.lm_head(sequence_output)
if decoder_input_ids is not None:
lm_loss_fct = CrossEntropyLoss(ignore_index=0) # Warning: PAD_ID should be 0
loss = lm_loss_fct(lm_logits.view(-1, lm_logits.size(-1)), decoder_input_ids.view(-1))
if encoder_loss and input_labels is not None:
cls_loss_fct = CrossEntropyLoss(ignore_index=-100)
loss += cls_loss_fct(cls_logits.view(-1, cls_logits.size(-1)), input_labels.view(-1))
return loss
return cls_logits, lm_logits
def prepare_models():
tokenizer = AutoTokenizer.from_pretrained("microsoft/codereviewer")
tokenizer.special_dict = {
f"<e{i}>": tokenizer.get_vocab()[f"<e{i}>"] for i in range(99, -1, -1)
}
tokenizer.mask_id = tokenizer.get_vocab()["<mask>"]
tokenizer.bos_id = tokenizer.get_vocab()["<s>"]
tokenizer.pad_id = tokenizer.get_vocab()["<pad>"]
tokenizer.eos_id = tokenizer.get_vocab()["</s>"]
tokenizer.msg_id = tokenizer.get_vocab()["<msg>"]
tokenizer.keep_id = tokenizer.get_vocab()["<keep>"]
tokenizer.add_id = tokenizer.get_vocab()["<add>"]
tokenizer.del_id = tokenizer.get_vocab()["<del>"]
tokenizer.start_id = tokenizer.get_vocab()["<start>"]
tokenizer.end_id = tokenizer.get_vocab()["<end>"]
config = T5Config.from_pretrained("microsoft/codereviewer")
model = ReviewerModel.from_pretrained("microsoft/codereviewer", config=config)
model.eval()
return tokenizer, model
def pad_assert(tokenizer, source_ids):
source_ids = source_ids[:MAX_SOURCE_LENGTH - 2]
source_ids = [tokenizer.bos_id] + source_ids + [tokenizer.eos_id]
pad_len = MAX_SOURCE_LENGTH - len(source_ids)
source_ids += [tokenizer.pad_id] * pad_len
assert len(source_ids) == MAX_SOURCE_LENGTH, "Not equal length."
return source_ids
def encode_diff(tokenizer, diff, msg, source):
difflines = diff.split("\n")[1:] # remove start @@
difflines = [line for line in difflines if len(line.strip()) > 0]
map_dic = {"-": 0, "+": 1, " ": 2}
def f(s):
if s in map_dic:
return map_dic[s]
else:
return 2
labels = [f(line[0]) for line in difflines]
difflines = [line[1:].strip() for line in difflines]
inputstr = "<s>" + source + "</s>"
inputstr += "<msg>" + msg
for label, line in zip(labels, difflines):
if label == 1:
inputstr += "<add>" + line
elif label == 0:
inputstr += "<del>" + line
else:
inputstr += "<keep>" + line
source_ids = tokenizer.encode(inputstr, max_length=MAX_SOURCE_LENGTH, truncation=True)[1:-1]
source_ids = pad_assert(tokenizer, source_ids)
return source_ids
class FileDiffs(object):
def __init__(self, diff_string):
diff_array = diff_string.split("\n")
self.file_name = diff_array[0]
self.file_path = self.file_name.split("a/", 1)[1].rsplit("b/", 1)[0]
self.diffs = list()
for line in diff_array[4:]:
if line.startswith("@@"):
self.diffs.append(str())
self.diffs[-1] += "\n" + line
def review_commit(user="p4vv37", repository="ueflow", commit="610a8c7b02b946bc9e5e26e6dacbba0e2abba259"):
tokenizer, model = prepare_models()
# Get diff and commit metadata from GitHub API
commit_metadata = requests.get(F"https://api.github.com/repos/{user}/{repository}/commits/{commit}").json()
msg = commit_metadata["commit"]["message"]
diff_data = requests.get(F"https://api.github.com/repos/{user}/{repository}/commits/{commit}",
headers={"Accept": "application/vnd.github.diff"})
code_diff = diff_data.text
# Parse diff into FileDiffs objects
files_diffs = list()
for file in code_diff.split("diff --git"):
if len(file) > 0:
fd = FileDiffs(file)
files_diffs.append(fd)
# Generate comments for each diff
output = ""
for fd in files_diffs:
output += F"File:{fd.file_path}\n"
source = requests.get(F"https://raw.githubusercontent.com/{user}/{repository}/^{commit}/{fd.file_path}").text
for diff in fd.diffs:
inputs = torch.tensor([encode_diff(tokenizer, diff, msg, source)], dtype=torch.long).to("cpu")
inputs_mask = inputs.ne(tokenizer.pad_id)
logits = model(
input_ids=inputs,
cls=True,
attention_mask=inputs_mask,
labels=None,
use_cache=True,
num_beams=5,
early_stopping=True,
max_length=100
)
needs_review = torch.argmax(logits, dim=-1).cpu().numpy()[0]
if not needs_review:
continue
preds = model.generate(inputs,
attention_mask=inputs_mask,
use_cache=True,
num_beams=5,
early_stopping=True,
max_length=100,
num_return_sequences=2
)
preds = list(preds.cpu().numpy())
pred_nls = [tokenizer.decode(_id[2:], skip_special_tokens=True, clean_up_tokenization_spaces=False)
for _id in preds]
output += diff + "\n#######\nComment:\n#######\n" + pred_nls[0] + "\n#######\n"
return output
description = "An interface for running " \
"\"Microsoft CodeBERT CodeReviewer: Pre-Training for Automating Code Review Activities.\" " \
"(microsoft/codereviewer) on GitHub commits."
examples = [
["p4vv37", "ueflow", "610a8c7b02b946bc9e5e26e6dacbba0e2abba259"],
["microsoft", "vscode", "378b0d711f6b82ac59b47fb246906043a6fb995a"],
]
iface = gr.Interface(fn=review_commit,
description=description,
inputs=["text", "text", "text"],
outputs="text",
examples=examples)
iface.launch()