Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
""" | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/193Qwk9yyPHgI0H84JJOchTovg_CELJuw | |
""" | |
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import Dataset, DataLoader | |
from transformers import BertTokenizer, BertModel, AdamW, get_linear_schedule_with_warmup | |
import pytorch_lightning as pl | |
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping | |
from sklearn.model_selection import train_test_split | |
RANDOM_SEED = 42 | |
np.random.seed(RANDOM_SEED) | |
torch.manual_seed(RANDOM_SEED) | |
# Preparing training data | |
train_file_path = '/content/sample_data/train_data.csv' | |
train_data = pd.read_csv(train_file_path) | |
filtro = (train_data['emotion'] == 'anger') | (train_data['emotion'] == 'fear') | (train_data['emotion'] == 'joy') | (train_data['emotion'] == 'sadness') | (train_data['emotion'] == 'neutral') | (train_data['emotion'] == 'surprise') | |
df = train_data[filtro] | |
angerColumn = [] | |
fearColumn = [] | |
surpriseColumn = [] | |
sadnessColumn = [] | |
joyColumn = [] | |
neutralColumn = [] | |
for e in df['emotion']: | |
if e == 'anger': | |
angerColumn.append(1) | |
joyColumn.append(0) | |
sadnessColumn.append(0) | |
fearColumn.append(0) | |
surpriseColumn.append(0) | |
neutralColumn.append(0) | |
elif e == 'joy': | |
joyColumn.append(1) | |
angerColumn.append(0) | |
sadnessColumn.append(0) | |
fearColumn.append(0) | |
surpriseColumn.append(0) | |
neutralColumn.append(0) | |
elif e == 'sadness': | |
sadnessColumn.append(1) | |
angerColumn.append(0) | |
joyColumn.append(0) | |
fearColumn.append(0) | |
surpriseColumn.append(0) | |
neutralColumn.append(0) | |
elif e == 'fear': | |
fearColumn.append(1) | |
angerColumn.append(0) | |
joyColumn.append(0) | |
sadnessColumn.append(0) | |
surpriseColumn.append(0) | |
neutralColumn.append(0) | |
elif e == 'surprise': | |
surpriseColumn.append(1) | |
angerColumn.append(0) | |
joyColumn.append(0) | |
sadnessColumn.append(0) | |
fearColumn.append(0) | |
neutralColumn.append(0) | |
elif e == 'neutral': | |
neutralColumn.append(1) | |
surpriseColumn.append(0) | |
angerColumn.append(0) | |
joyColumn.append(0) | |
sadnessColumn.append(0) | |
fearColumn.append(0) | |
df['anger'] = angerColumn | |
df['fear'] = fearColumn | |
df['surprise'] = surpriseColumn | |
df['joy'] = joyColumn | |
df['sadness'] = sadnessColumn | |
df['neutral'] = neutralColumn | |
df.drop(['emotion', 'message_id', 'response_id', 'article_id', 'empathy', 'distress', | |
'empathy_bin', 'distress_bin', 'gender', 'education','race', 'age','income','personality_conscientiousness', | |
'personality_openess','personality_extraversion','personality_agreeableness','personality_stability', | |
'iri_perspective_taking','iri_personal_distress', 'iri_fantasy', 'iri_empathatic_concern','raw_input_emotions'], | |
axis=1, inplace=True) | |
print(df.head()) | |
train_df, val_df = train_test_split(df, test_size=0.05) | |
train_df.shape, val_df.shape | |
LABEL_COLUMNS = ['anger','joy','fear','surprise','sadness', 'neutral'] | |
sample_row = train_df.iloc[16] | |
sample_comment = sample_row.essay | |
sample_labels = sample_row[LABEL_COLUMNS] | |
print(sample_comment) | |
print(sample_labels.to_dict()) | |
BERT_MODEL_NAME = 'bert-base-cased' | |
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME) | |
encoding = tokenizer.encode_plus( | |
sample_comment, | |
add_special_tokens=True, | |
max_length=512, | |
return_token_type_ids=False, | |
padding="max_length", | |
return_attention_mask=True, | |
return_tensors='pt', | |
) | |
encoding.keys() | |
encoding["input_ids"].shape, encoding["attention_mask"].shape | |
encoding["input_ids"].squeeze()[:20] | |
encoding["attention_mask"].squeeze()[:20] | |
print(tokenizer.convert_ids_to_tokens(encoding["input_ids"].squeeze())[:20]) | |
class EmotionDataset(Dataset): | |
def __init__( | |
self, | |
data: pd.DataFrame, | |
tokenizer: BertTokenizer, | |
max_token_len: int = 128 | |
): | |
self.tokenizer = tokenizer | |
self.data = data | |
self.max_token_len = max_token_len | |
def __len__(self): | |
return len(self.data) | |
def __getitem__(self, index: int): | |
data_row = self.data.iloc[index] | |
comment_text = data_row.essay | |
labels = data_row[LABEL_COLUMNS] | |
encoding = self.tokenizer.encode_plus( | |
comment_text, | |
add_special_tokens=True, | |
max_length=self.max_token_len, | |
return_token_type_ids=False, | |
padding="max_length", | |
truncation=True, | |
return_attention_mask=True, | |
return_tensors='pt', | |
) | |
return dict( | |
comment_text=comment_text, | |
input_ids=encoding["input_ids"].flatten(), | |
attention_mask=encoding["attention_mask"].flatten(), | |
labels=torch.FloatTensor(labels) | |
) | |
bert_model = BertModel.from_pretrained(BERT_MODEL_NAME, return_dict=True) | |
train_dataset = EmotionDataset(train_df,tokenizer) | |
sample_item = train_dataset[0] | |
sample_item.keys() | |
sample_batch = next(iter(DataLoader(train_dataset, batch_size=8, num_workers=2))) | |
sample_batch["input_ids"].shape, sample_batch["attention_mask"].shape | |
output = bert_model(sample_batch["input_ids"], sample_batch["attention_mask"]) | |
output.last_hidden_state.shape, output.pooler_output.shape | |
class EmotionDataModule(pl.LightningDataModule): | |
def __init__(self, train_df, test_df, tokenizer, batch_size=8, max_token_len=128): | |
super().__init__() | |
self.batch_size = batch_size | |
self.train_df = train_df | |
self.test_df = test_df | |
self.tokenizer = tokenizer | |
self.max_token_len = max_token_len | |
def setup(self, stage=None): | |
self.train_dataset = EmotionDataset( | |
self.train_df, | |
self.tokenizer, | |
self.max_token_len | |
) | |
self.test_dataset = EmotionDataset( | |
self.test_df, | |
self.tokenizer, | |
self.max_token_len | |
) | |
def train_dataloader(self): | |
return DataLoader( | |
self.train_dataset, | |
batch_size=self.batch_size, | |
shuffle=True, | |
num_workers=2 | |
) | |
def val_dataloader(self): | |
return DataLoader( | |
self.test_dataset, | |
batch_size=self.batch_size, | |
num_workers=2 | |
) | |
def test_dataloader(self): | |
return DataLoader( | |
self.test_dataset, | |
batch_size=self.batch_size, | |
num_workers=2 | |
) | |
N_EPOCHS = 10 | |
BATCH_SIZE = 12 | |
MAX_TOKEN_COUNT = 512 | |
data_module = EmotionDataModule( | |
train_df, | |
val_df, | |
tokenizer, | |
batch_size=BATCH_SIZE, | |
max_token_len=MAX_TOKEN_COUNT | |
) | |
class EmotionTagger(pl.LightningModule): | |
def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None): | |
super().__init__() | |
self.bert = BertModel.from_pretrained(BERT_MODEL_NAME, return_dict=True) | |
self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) | |
self.n_training_steps = n_training_steps | |
self.n_warmup_steps = n_warmup_steps | |
self.criterion = nn.BCELoss() | |
def forward(self, input_ids, attention_mask, labels=None): | |
output = self.bert(input_ids, attention_mask=attention_mask) | |
output = self.classifier(output.pooler_output) | |
output = torch.sigmoid(output) | |
loss = 0 | |
if labels is not None: | |
loss = self.criterion(output, labels) | |
return loss, output | |
def training_step(self, batch, batch_idx): | |
input_ids = batch["input_ids"] | |
attention_mask = batch["attention_mask"] | |
labels = batch["labels"] | |
loss, outputs = self(input_ids, attention_mask, labels) | |
self.log("train_loss", loss, prog_bar=True, logger=True) | |
return {"loss": loss, "predictions": outputs, "labels": labels} | |
def validation_step(self, batch, batch_idx): | |
input_ids = batch["input_ids"] | |
attention_mask = batch["attention_mask"] | |
labels = batch["labels"] | |
loss, outputs = self(input_ids, attention_mask, labels) | |
self.log("val_loss", loss, prog_bar=True, logger=True) | |
return loss | |
def test_step(self, batch, batch_idx): | |
input_ids = batch["input_ids"] | |
attention_mask = batch["attention_mask"] | |
labels = batch["labels"] | |
loss, outputs = self(input_ids, attention_mask, labels) | |
self.log("test_loss", loss, prog_bar=True, logger=True) | |
return loss | |
for i, name in enumerate(LABEL_COLUMNS): | |
class_roc_auc = pytorch_lightning.metrics.functional.auroc(predictions[:, i], labels[:, i]) | |
self.logger.experiment.add_scalar(f"{name}_roc_auc/Train", class_roc_auc, self.current_epoch) | |
def configure_optimizers(self): | |
optimizer = AdamW(self.parameters(), lr=2e-5) | |
scheduler = get_linear_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=self.n_warmup_steps, | |
num_training_steps=self.n_training_steps | |
) | |
return dict( | |
optimizer=optimizer, | |
lr_scheduler=dict( | |
scheduler=scheduler, | |
interval='step' | |
) | |
) | |
steps_per_epoch=len(train_df) // BATCH_SIZE | |
total_training_steps = steps_per_epoch * N_EPOCHS | |
warmup_steps = total_training_steps // 5 | |
model = EmotionTagger( | |
n_classes=len(LABEL_COLUMNS), | |
n_warmup_steps=warmup_steps, | |
n_training_steps=total_training_steps | |
) | |
!rm -rf lightning_logs/ | |
!rm -rf checkpoints/ | |
checkpoint_callback = ModelCheckpoint( | |
dirpath="checkpoints", | |
filename="best-checkpoint", | |
save_top_k=1, | |
verbose=True, | |
monitor="val_loss", | |
mode="min" | |
) | |
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=2) | |
trainer = pl.Trainer( | |
max_epochs=N_EPOCHS, | |
callbacks=[early_stopping_callback,checkpoint_callback],) | |
trainer.fit(model, data_module) | |
trained_model = EmotionTagger.load_from_checkpoint( | |
trainer.checkpoint_callback.best_model_path, | |
n_classes=len(LABEL_COLUMNS) | |
) | |
trained_model.eval() | |
trained_model.freeze() | |
def run_sentiment_analysis (txt) : | |
THRESHOLD = 0.5 | |
encoding = tokenizer.encode_plus( | |
txt, | |
add_special_tokens=True, | |
max_length=512, | |
return_token_type_ids=False, | |
padding="max_length", | |
return_attention_mask=True, | |
return_tensors='pt', | |
) | |
_, test_prediction = trained_model(encoding["input_ids"], encoding["attention_mask"]) | |
test_prediction = test_prediction.flatten().numpy() | |
predictions = [] | |
for label, prediction in zip(LABEL_COLUMNS, test_prediction): | |
if prediction < THRESHOLD: | |
continue | |
predictions.append("{label}: {prediction}") | |
return predictions |