# -*- coding: utf-8 -*- """ Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/193Qwk9yyPHgI0H84JJOchTovg_CELJuw """ import pandas as pd import numpy as np import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer, BertModel, AdamW, get_linear_schedule_with_warmup import pytorch_lightning as pl from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping RANDOM_SEED = 42 np.random.seed(RANDOM_SEED) torch.manual_seed(RANDOM_SEED) # Preparing training data train_file_path = './sample_data/train_data.csv' train_data = pd.read_csv(train_file_path) filtro = (train_data['emotion'] == 'anger') | (train_data['emotion'] == 'fear') | (train_data['emotion'] == 'joy') | (train_data['emotion'] == 'sadness') | (train_data['emotion'] == 'neutral') | (train_data['emotion'] == 'surprise') df = train_data[filtro] angerColumn = [] fearColumn = [] surpriseColumn = [] sadnessColumn = [] joyColumn = [] neutralColumn = [] for e in df['emotion']: if e == 'anger': angerColumn.append(1) joyColumn.append(0) sadnessColumn.append(0) fearColumn.append(0) surpriseColumn.append(0) neutralColumn.append(0) elif e == 'joy': joyColumn.append(1) angerColumn.append(0) sadnessColumn.append(0) fearColumn.append(0) surpriseColumn.append(0) neutralColumn.append(0) elif e == 'sadness': sadnessColumn.append(1) angerColumn.append(0) joyColumn.append(0) fearColumn.append(0) surpriseColumn.append(0) neutralColumn.append(0) elif e == 'fear': fearColumn.append(1) angerColumn.append(0) joyColumn.append(0) sadnessColumn.append(0) surpriseColumn.append(0) neutralColumn.append(0) elif e == 'surprise': surpriseColumn.append(1) angerColumn.append(0) joyColumn.append(0) sadnessColumn.append(0) fearColumn.append(0) neutralColumn.append(0) elif e == 'neutral': neutralColumn.append(1) surpriseColumn.append(0) angerColumn.append(0) joyColumn.append(0) sadnessColumn.append(0) fearColumn.append(0) df['anger'] = angerColumn df['fear'] = fearColumn df['surprise'] = surpriseColumn df['joy'] = joyColumn df['sadness'] = sadnessColumn df['neutral'] = neutralColumn df.drop(['emotion', 'message_id', 'response_id', 'article_id', 'empathy', 'distress', 'empathy_bin', 'distress_bin', 'gender', 'education','race', 'age','income','personality_conscientiousness', 'personality_openess','personality_extraversion','personality_agreeableness','personality_stability', 'iri_perspective_taking','iri_personal_distress', 'iri_fantasy', 'iri_empathatic_concern','raw_input_emotions'], axis=1, inplace=True) print(df.head()) train_df, val_df = sklearn.model_selection.train_test_split(df, test_size=0.05) train_df.shape, val_df.shape LABEL_COLUMNS = ['anger','joy','fear','surprise','sadness', 'neutral'] sample_row = train_df.iloc[16] sample_comment = sample_row.essay sample_labels = sample_row[LABEL_COLUMNS] print(sample_comment) print(sample_labels.to_dict()) BERT_MODEL_NAME = 'bert-base-cased' tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME) encoding = tokenizer.encode_plus( sample_comment, add_special_tokens=True, max_length=512, return_token_type_ids=False, padding="max_length", return_attention_mask=True, return_tensors='pt', ) encoding.keys() encoding["input_ids"].shape, encoding["attention_mask"].shape encoding["input_ids"].squeeze()[:20] encoding["attention_mask"].squeeze()[:20] print(tokenizer.convert_ids_to_tokens(encoding["input_ids"].squeeze())[:20]) class EmotionDataset(Dataset): def __init__( self, data: pd.DataFrame, tokenizer: BertTokenizer, max_token_len: int = 128 ): self.tokenizer = tokenizer self.data = data self.max_token_len = max_token_len def __len__(self): return len(self.data) def __getitem__(self, index: int): data_row = self.data.iloc[index] comment_text = data_row.essay labels = data_row[LABEL_COLUMNS] encoding = self.tokenizer.encode_plus( comment_text, add_special_tokens=True, max_length=self.max_token_len, return_token_type_ids=False, padding="max_length", truncation=True, return_attention_mask=True, return_tensors='pt', ) return dict( comment_text=comment_text, input_ids=encoding["input_ids"].flatten(), attention_mask=encoding["attention_mask"].flatten(), labels=torch.FloatTensor(labels) ) bert_model = BertModel.from_pretrained(BERT_MODEL_NAME, return_dict=True) train_dataset = EmotionDataset(train_df,tokenizer) sample_item = train_dataset[0] sample_item.keys() sample_batch = next(iter(DataLoader(train_dataset, batch_size=8, num_workers=2))) sample_batch["input_ids"].shape, sample_batch["attention_mask"].shape output = bert_model(sample_batch["input_ids"], sample_batch["attention_mask"]) output.last_hidden_state.shape, output.pooler_output.shape class EmotionDataModule(pl.LightningDataModule): def __init__(self, train_df, test_df, tokenizer, batch_size=8, max_token_len=128): super().__init__() self.batch_size = batch_size self.train_df = train_df self.test_df = test_df self.tokenizer = tokenizer self.max_token_len = max_token_len def setup(self, stage=None): self.train_dataset = EmotionDataset( self.train_df, self.tokenizer, self.max_token_len ) self.test_dataset = EmotionDataset( self.test_df, self.tokenizer, self.max_token_len ) def train_dataloader(self): return DataLoader( self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=2 ) def val_dataloader(self): return DataLoader( self.test_dataset, batch_size=self.batch_size, num_workers=2 ) def test_dataloader(self): return DataLoader( self.test_dataset, batch_size=self.batch_size, num_workers=2 ) N_EPOCHS = 10 BATCH_SIZE = 12 MAX_TOKEN_COUNT = 512 data_module = EmotionDataModule( train_df, val_df, tokenizer, batch_size=BATCH_SIZE, max_token_len=MAX_TOKEN_COUNT ) class EmotionTagger(pl.LightningModule): def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None): super().__init__() self.bert = BertModel.from_pretrained(BERT_MODEL_NAME, return_dict=True) self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) self.n_training_steps = n_training_steps self.n_warmup_steps = n_warmup_steps self.criterion = nn.BCELoss() def forward(self, input_ids, attention_mask, labels=None): output = self.bert(input_ids, attention_mask=attention_mask) output = self.classifier(output.pooler_output) output = torch.sigmoid(output) loss = 0 if labels is not None: loss = self.criterion(output, labels) return loss, output def training_step(self, batch, batch_idx): input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] labels = batch["labels"] loss, outputs = self(input_ids, attention_mask, labels) self.log("train_loss", loss, prog_bar=True, logger=True) return {"loss": loss, "predictions": outputs, "labels": labels} def validation_step(self, batch, batch_idx): input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] labels = batch["labels"] loss, outputs = self(input_ids, attention_mask, labels) self.log("val_loss", loss, prog_bar=True, logger=True) return loss def test_step(self, batch, batch_idx): input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] labels = batch["labels"] loss, outputs = self(input_ids, attention_mask, labels) self.log("test_loss", loss, prog_bar=True, logger=True) return loss for i, name in enumerate(LABEL_COLUMNS): class_roc_auc = pytorch_lightning.metrics.functional.auroc(predictions[:, i], labels[:, i]) self.logger.experiment.add_scalar(f"{name}_roc_auc/Train", class_roc_auc, self.current_epoch) def configure_optimizers(self): optimizer = AdamW(self.parameters(), lr=2e-5) scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=self.n_warmup_steps, num_training_steps=self.n_training_steps ) return dict( optimizer=optimizer, lr_scheduler=dict( scheduler=scheduler, interval='step' ) ) steps_per_epoch=len(train_df) // BATCH_SIZE total_training_steps = steps_per_epoch * N_EPOCHS warmup_steps = total_training_steps // 5 model = EmotionTagger( n_classes=len(LABEL_COLUMNS), n_warmup_steps=warmup_steps, n_training_steps=total_training_steps ) checkpoint_callback = ModelCheckpoint( dirpath="checkpoints", filename="best-checkpoint", save_top_k=1, verbose=True, monitor="val_loss", mode="min" ) early_stopping_callback = EarlyStopping(monitor='val_loss', patience=2) trainer = pl.Trainer( max_epochs=N_EPOCHS, callbacks=[early_stopping_callback,checkpoint_callback],) trainer.fit(model, data_module) trained_model = EmotionTagger.load_from_checkpoint( trainer.checkpoint_callback.best_model_path, n_classes=len(LABEL_COLUMNS) ) trained_model.eval() trained_model.freeze() def run_sentiment_analysis (txt) : THRESHOLD = 0.5 encoding = tokenizer.encode_plus( txt, add_special_tokens=True, max_length=512, return_token_type_ids=False, padding="max_length", return_attention_mask=True, return_tensors='pt', ) _, test_prediction = trained_model(encoding["input_ids"], encoding["attention_mask"]) test_prediction = test_prediction.flatten().numpy() predictions = [] for label, prediction in zip(LABEL_COLUMNS, test_prediction): if prediction < THRESHOLD: continue predictions.append("{label}: {prediction}") return predictions