# -*- coding: utf-8 -*- """final_classifier.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1i2uCPCvqnax-vpQBo43Ri8ivTe0HnqKK # Installing Packages """ # Only install once and then reset runtime !pip install accelerate !pip install optuna """# Loading Libraries""" # Loading Packages import pandas as pd from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding, EarlyStoppingCallback import torch from torch.utils.data import Dataset, DataLoader import torch.nn.functional as F from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score, roc_auc_score, confusion_matrix from sklearn.utils.class_weight import compute_class_weight import optuna import numpy as np import random import accelerate from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from google.colab import drive from transformers import DataCollatorWithPadding """# Importing and Cleaning Data""" # Read the data drive.mount('/content/drive') bias = pd.read_csv('/content/drive/MyDrive/hackathon/misdirection.csv') # Selecting out badly formatted columns clean_bias = bias.loc[:, 'conversation_id':'unique_id'] # Filtering to just accepted vs. rejected clean_bias = clean_bias[clean_bias['submission_grade'].isin(['accepted', 'rejected'])] # Removing all NA under user (these do not help) clean_bias = clean_bias.dropna(subset=['user']) # Grouping by unique_id and joining each prompt into a single paragraph grouped = clean_bias.groupby('unique_id')['user'].apply(lambda x: ' '.join(x)).reset_index() # Selecting the predictor variable to be these paragraphs X = grouped["user"].astype(str).tolist() # Creating the predicted variable to be rejected and accepted as binary y = clean_bias.groupby('unique_id')['submission_grade'].apply(lambda x: x.iloc[-1]).map({'rejected': 'non-violation','accepted': 'violation'}).tolist() # Split the data in such a way that y is stratified X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=1, stratify=y) """# Tokenizing Data""" # Load tokenizer and model tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2) # Tokenize the data train_encodings = tokenizer(X_train, truncation=True, padding=True, max_length=256) test_encodings = tokenizer(X_test, truncation=True, padding=True, max_length=256) # Creating a customdataset class CustomDataset(Dataset): def __init__(self, encodings, labels): self.encodings = encodings self.labels = labels def __getitem__(self, idx): item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} label = 0 if self.labels[idx] == 'non-violation' else 1 item['labels'] = torch.tensor(label, dtype=torch.long) return item def __len__(self): return len(self.labels) # Create the dataset objects train_dataset = CustomDataset(train_encodings, [0 if label == 'non-violation' else 1 for label in y_train]) test_dataset = CustomDataset(test_encodings, [0 if label == 'non-violation' else 1 for label in y_test]) """# Creating Model""" # Defining the metrics def compute_metrics(pred): labels = pred.label_ids preds = pred.predictions.argmax(-1) accuracy = accuracy_score(labels, preds) precision = precision_score(labels, preds, average='weighted') recall = recall_score(labels, preds, average='weighted') f1 = f1_score(labels, preds, average='weighted') return { "accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1 } # Objective function for Optuna def objective(trial): # Preventing overfitting and defining hyperparameters dropout_rate = trial.suggest_uniform('dropout_rate', 0.1, 0.5) training_args = TrainingArguments( output_dir="./misdirection_classification", learning_rate=trial.suggest_loguniform('learning_rate', 1e-5, 5e-5), per_device_train_batch_size=trial.suggest_categorical('batch_size', [8, 16, 32]), gradient_accumulation_steps=2, num_train_epochs=trial.suggest_int('num_train_epochs', 3, 10), weight_decay=trial.suggest_loguniform('weight_decay', 1e-4, 1e-1), save_strategy="epoch", evaluation_strategy="epoch", logging_dir="./logs", logging_steps=10, load_best_model_at_end=True, metric_for_best_model="f1", push_to_hub=False, ) # Tokenizing the data train_encodings_fold = tokenizer(X_train, truncation=True, padding=True, max_length=256) val_encodings_fold = tokenizer(X_test, truncation=True, padding=True, max_length=256) # Creating dataset objects train_dataset_fold = CustomDataset(train_encodings_fold, y_train) val_dataset_fold = CustomDataset(val_encodings_fold, y_test) # Initializing a new model model_fold = model_init(dropout_rate) # Defining the trainer trainer = Trainer( model=model_fold, args=training_args, train_dataset=train_dataset_fold, eval_dataset=val_dataset_fold, tokenizer=tokenizer, compute_metrics=compute_metrics, ) # Training the model trainer.train() eval_result = trainer.evaluate(eval_dataset=val_dataset_fold) accuracy = eval_result['eval_accuracy'] precision = eval_result['eval_precision'] recall = eval_result['eval_recall'] f1 = eval_result['eval_f1'] # Calculate the composite score using average metrics (f1 yielded best results in end) composite_score = ( 0.25 * accuracy + 0.25 * precision + 0.25 * recall + 0.25 * f1 ) return f1 # Model initialization function def model_init(dropout_rate): model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2) model.classifier.dropout = torch.nn.Dropout(p=dropout_rate) return model # Run Optuna optimization study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=15) """# Final Model""" # Retrieve the best parameters from the Optuna study best_params = study.best_params # Define training arguments using the best parameters training_args = TrainingArguments( output_dir="predicting_misdirection", learning_rate=best_params['learning_rate'], per_device_train_batch_size=best_params['batch_size'], gradient_accumulation_steps=2, num_train_epochs=best_params['num_train_epochs'], weight_decay=best_params['weight_decay'], save_strategy="epoch", evaluation_strategy="epoch", logging_dir="logs", logging_steps=10, load_best_model_at_end=True, metric_for_best_model="f1", push_to_hub=False, ) # Define a data collator data_collator = DataCollatorWithPadding(tokenizer) # Initialize the trainer with the specified arguments trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=test_dataset, tokenizer=tokenizer, data_collator=data_collator, compute_metrics=compute_metrics, ) """# Training Final Model""" # Training the final model on hyperparameters trainer.train() """# Evaluating Final Mode""" # Getting evaluation results eval_result = trainer.evaluate(eval_dataset=test_dataset) for key, value in eval_result.items(): print(f"{key}: {value}") # Getting confusion matrix predictions = trainer.predict(test_dataset) predicted_labels = np.argmax(predictions.predictions, axis=1) true_labels = [item['labels'].item() for item in test_dataset] cm = confusion_matrix(true_labels, predicted_labels) import matplotlib.pyplot as plt import seaborn as sns plt.figure(figsize=(10, 7)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.xlabel('Predicted Labels') plt.ylabel('True Labels') plt.title('Confusion Matrix') plt.show()