|
import pytorch_lightning as pl |
|
import torch |
|
from transformers.optimization import AdamW |
|
import torchmetrics |
|
from torchmetrics.classification import F1Score |
|
|
|
|
|
class SequenceClassificationModule(pl.LightningModule): |
|
|
|
def __init__( |
|
self, tokenizer, model, use_question_stance_approach=True, learning_rate=1e-3 |
|
): |
|
super().__init__() |
|
self.tokenizer = tokenizer |
|
self.model = model |
|
self.learning_rate = learning_rate |
|
|
|
self.train_acc = torchmetrics.Accuracy( |
|
task="multiclass", num_classes=model.num_labels |
|
) |
|
self.val_acc = torchmetrics.Accuracy( |
|
task="multiclass", num_classes=model.num_labels |
|
) |
|
self.test_acc = torchmetrics.Accuracy( |
|
task="multiclass", num_classes=model.num_labels |
|
) |
|
|
|
self.train_f1 = F1Score( |
|
task="multiclass", num_classes=model.num_labels, average="macro" |
|
) |
|
self.val_f1 = F1Score( |
|
task="multiclass", num_classes=model.num_labels, average=None |
|
) |
|
self.test_f1 = F1Score( |
|
task="multiclass", num_classes=model.num_labels, average=None |
|
) |
|
|
|
self.use_question_stance_approach = use_question_stance_approach |
|
|
|
def forward(self, input_ids, **kwargs): |
|
return self.model(input_ids, **kwargs) |
|
|
|
def configure_optimizers(self): |
|
optimizer = AdamW(self.parameters(), lr=self.learning_rate) |
|
return optimizer |
|
|
|
def training_step(self, batch, batch_idx): |
|
x, x_mask, y = batch |
|
|
|
outputs = self(x, attention_mask=x_mask, labels=y) |
|
logits = outputs.logits |
|
loss = outputs.loss |
|
|
|
preds = torch.argmax(logits, axis=1) |
|
|
|
self.log("train_loss", loss) |
|
|
|
return {"loss": loss} |
|
|
|
def validation_step(self, batch, batch_idx): |
|
x, x_mask, y = batch |
|
|
|
outputs = self(x, attention_mask=x_mask, labels=y) |
|
logits = outputs.logits |
|
loss = outputs.loss |
|
|
|
preds = torch.argmax(logits, axis=1) |
|
|
|
if not self.use_question_stance_approach: |
|
self.val_acc(preds, y) |
|
self.log("val_acc_step", self.val_acc) |
|
|
|
self.val_f1(preds, y) |
|
self.log("val_loss", loss) |
|
|
|
return {"val_loss": loss, "src": x, "pred": preds, "target": y} |
|
|
|
def validation_epoch_end(self, outs): |
|
if self.use_question_stance_approach: |
|
self.handle_end_of_epoch_scoring(outs, self.val_acc, self.val_f1) |
|
|
|
self.log("val_acc_epoch", self.val_acc) |
|
|
|
f1 = self.val_f1.compute() |
|
self.val_f1.reset() |
|
|
|
self.log("val_f1_epoch", torch.mean(f1)) |
|
|
|
class_names = ["supported", "refuted", "nei", "conflicting"] |
|
for i, c_name in enumerate(class_names): |
|
self.log("val_f1_" + c_name, f1[i]) |
|
|
|
def test_step(self, batch, batch_idx): |
|
x, x_mask, y = batch |
|
|
|
outputs = self(x, attention_mask=x_mask) |
|
logits = outputs.logits |
|
|
|
preds = torch.argmax(logits, axis=1) |
|
|
|
if not self.use_question_stance_approach: |
|
self.test_acc(preds, y) |
|
self.log("test_acc_step", self.test_acc) |
|
self.test_f1(preds, y) |
|
|
|
return {"src": x, "pred": preds, "target": y} |
|
|
|
def test_epoch_end(self, outs): |
|
if self.use_question_stance_approach: |
|
self.handle_end_of_epoch_scoring(outs, self.test_acc, self.test_f1) |
|
|
|
self.log("test_acc_epoch", self.test_acc) |
|
|
|
f1 = self.test_f1.compute() |
|
self.test_f1.reset() |
|
self.log("test_f1_epoch", torch.mean(f1)) |
|
|
|
class_names = ["supported", "refuted", "nei", "conflicting"] |
|
for i, c_name in enumerate(class_names): |
|
self.log("test_f1_" + c_name, f1[i]) |
|
|
|
def handle_end_of_epoch_scoring(self, outputs, acc_scorer, f1_scorer): |
|
gold_labels = {} |
|
question_support = {} |
|
for out in outputs: |
|
srcs = out["src"] |
|
preds = out["pred"] |
|
tgts = out["target"] |
|
|
|
tokens = self.tokenizer.batch_decode( |
|
srcs, skip_special_tokens=True, clean_up_tokenization_spaces=True |
|
) |
|
|
|
for src, pred, tgt in zip(tokens, preds, tgts): |
|
claim_id = src.split("[ question ]")[0] |
|
|
|
if claim_id not in gold_labels: |
|
gold_labels[claim_id] = tgt |
|
question_support[claim_id] = [] |
|
|
|
question_support[claim_id].append(pred) |
|
|
|
for k, gold_label in gold_labels.items(): |
|
support = question_support[k] |
|
|
|
has_unanswerable = False |
|
has_true = False |
|
has_false = False |
|
|
|
for v in support: |
|
if v == 0: |
|
has_true = True |
|
if v == 1: |
|
has_false = True |
|
if v in ( |
|
2, |
|
3, |
|
): |
|
has_unanswerable = True |
|
|
|
if has_unanswerable: |
|
answer = 2 |
|
elif has_true and not has_false: |
|
answer = 0 |
|
elif has_false and not has_true: |
|
answer = 1 |
|
elif has_true and has_false: |
|
answer = 3 |
|
|
|
|
|
acc_scorer( |
|
torch.as_tensor([answer]).to("cuda:0"), |
|
torch.as_tensor([gold_label]).to("cuda:0"), |
|
) |
|
f1_scorer( |
|
torch.as_tensor([answer]).to("cuda:0"), |
|
torch.as_tensor([gold_label]).to("cuda:0"), |
|
) |
|
|