misdirection_classification / final_classifier.py
Eappelson's picture
Upload final_classifier.py
037b0cf verified
raw
history blame contribute delete
No virus
8.1 kB
# -*- coding: utf-8 -*-
"""final_classifier.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1i2uCPCvqnax-vpQBo43Ri8ivTe0HnqKK
# Installing Packages
"""
# Only install once and then reset runtime
!pip install accelerate
!pip install optuna
"""# Loading Libraries"""
# Loading Packages
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding, EarlyStoppingCallback
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
import optuna
import numpy as np
import random
import accelerate
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from google.colab import drive
from transformers import DataCollatorWithPadding
"""# Importing and Cleaning Data"""
# Read the data
drive.mount('/content/drive')
bias = pd.read_csv('/content/drive/MyDrive/hackathon/misdirection.csv')
# Selecting out badly formatted columns
clean_bias = bias.loc[:, 'conversation_id':'unique_id']
# Filtering to just accepted vs. rejected
clean_bias = clean_bias[clean_bias['submission_grade'].isin(['accepted', 'rejected'])]
# Removing all NA under user (these do not help)
clean_bias = clean_bias.dropna(subset=['user'])
# Grouping by unique_id and joining each prompt into a single paragraph
grouped = clean_bias.groupby('unique_id')['user'].apply(lambda x: ' '.join(x)).reset_index()
# Selecting the predictor variable to be these paragraphs
X = grouped["user"].astype(str).tolist()
# Creating the predicted variable to be rejected and accepted as binary
y = clean_bias.groupby('unique_id')['submission_grade'].apply(lambda x: x.iloc[-1]).map({'rejected': 'non-violation','accepted': 'violation'}).tolist()
# Split the data in such a way that y is stratified
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=1, stratify=y)
"""# Tokenizing Data"""
# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)
# Tokenize the data
train_encodings = tokenizer(X_train, truncation=True, padding=True, max_length=256)
test_encodings = tokenizer(X_test, truncation=True, padding=True, max_length=256)
# Creating a customdataset
class CustomDataset(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
label = 0 if self.labels[idx] == 'non-violation' else 1
item['labels'] = torch.tensor(label, dtype=torch.long)
return item
def __len__(self):
return len(self.labels)
# Create the dataset objects
train_dataset = CustomDataset(train_encodings, [0 if label == 'non-violation' else 1 for label in y_train])
test_dataset = CustomDataset(test_encodings, [0 if label == 'non-violation' else 1 for label in y_test])
"""# Creating Model"""
# Defining the metrics
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
accuracy = accuracy_score(labels, preds)
precision = precision_score(labels, preds, average='weighted')
recall = recall_score(labels, preds, average='weighted')
f1 = f1_score(labels, preds, average='weighted')
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1
}
# Objective function for Optuna
def objective(trial):
# Preventing overfitting and defining hyperparameters
dropout_rate = trial.suggest_uniform('dropout_rate', 0.1, 0.5)
training_args = TrainingArguments(
output_dir="./misdirection_classification",
learning_rate=trial.suggest_loguniform('learning_rate', 1e-5, 5e-5),
per_device_train_batch_size=trial.suggest_categorical('batch_size', [8, 16, 32]),
gradient_accumulation_steps=2,
num_train_epochs=trial.suggest_int('num_train_epochs', 3, 10),
weight_decay=trial.suggest_loguniform('weight_decay', 1e-4, 1e-1),
save_strategy="epoch",
evaluation_strategy="epoch",
logging_dir="./logs",
logging_steps=10,
load_best_model_at_end=True,
metric_for_best_model="f1",
push_to_hub=False,
)
# Tokenizing the data
train_encodings_fold = tokenizer(X_train, truncation=True, padding=True, max_length=256)
val_encodings_fold = tokenizer(X_test, truncation=True, padding=True, max_length=256)
# Creating dataset objects
train_dataset_fold = CustomDataset(train_encodings_fold, y_train)
val_dataset_fold = CustomDataset(val_encodings_fold, y_test)
# Initializing a new model
model_fold = model_init(dropout_rate)
# Defining the trainer
trainer = Trainer(
model=model_fold,
args=training_args,
train_dataset=train_dataset_fold,
eval_dataset=val_dataset_fold,
tokenizer=tokenizer,
compute_metrics=compute_metrics,
)
# Training the model
trainer.train()
eval_result = trainer.evaluate(eval_dataset=val_dataset_fold)
accuracy = eval_result['eval_accuracy']
precision = eval_result['eval_precision']
recall = eval_result['eval_recall']
f1 = eval_result['eval_f1']
# Calculate the composite score using average metrics (f1 yielded best results in end)
composite_score = (
0.25 * accuracy +
0.25 * precision +
0.25 * recall +
0.25 * f1
)
return f1
# Model initialization function
def model_init(dropout_rate):
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)
model.classifier.dropout = torch.nn.Dropout(p=dropout_rate)
return model
# Run Optuna optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=15)
"""# Final Model"""
# Retrieve the best parameters from the Optuna study
best_params = study.best_params
# Define training arguments using the best parameters
training_args = TrainingArguments(
output_dir="predicting_misdirection",
learning_rate=best_params['learning_rate'],
per_device_train_batch_size=best_params['batch_size'],
gradient_accumulation_steps=2,
num_train_epochs=best_params['num_train_epochs'],
weight_decay=best_params['weight_decay'],
save_strategy="epoch",
evaluation_strategy="epoch",
logging_dir="logs",
logging_steps=10,
load_best_model_at_end=True,
metric_for_best_model="f1",
push_to_hub=False,
)
# Define a data collator
data_collator = DataCollatorWithPadding(tokenizer)
# Initialize the trainer with the specified arguments
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=test_dataset,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
"""# Training Final Model"""
# Training the final model on hyperparameters
trainer.train()
"""# Evaluating Final Mode"""
# Getting evaluation results
eval_result = trainer.evaluate(eval_dataset=test_dataset)
for key, value in eval_result.items():
print(f"{key}: {value}")
# Getting confusion matrix
predictions = trainer.predict(test_dataset)
predicted_labels = np.argmax(predictions.predictions, axis=1)
true_labels = [item['labels'].item() for item in test_dataset]
cm = confusion_matrix(true_labels, predicted_labels)
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()