import torch import torch.nn as nn import pandas as pd from model import LSTMModel from preprocessing import preprocess_text from data_loader import create_data_loader from sklearn.model_selection import train_test_split from sklearn.metrics import f1_score, roc_auc_score from keras.preprocessing.text import Tokenizer from keras_preprocessing.sequence import pad_sequences import pickle import train as tr from torch.utils.data import Dataset, DataLoader from data_loader import NewsDataset import os version = 9 if __name__ == "__main__": # fake_path = './data_1/Fake.csv' # true_path = './data_1/True.csv' # cleaned_path = './cleaned_news_data.csv' # # Load data # try: # df = pd.read_csv(cleaned_path) # df.dropna(inplace=True) # print("Cleaned data found.") # except: # print("No cleaned data found. Cleaning data now...") # # Load the datasets # true_news = pd.read_csv('data_1/True.csv') # fake_news = pd.read_csv('data_1/Fake.csv') # # Add labels # true_news['label'] = 1 # fake_news['label'] = 0 # # Combine the datasets # df = pd.concat([true_news, fake_news], ignore_index=True) # # Drop unnecessary columns # df.drop(columns=['subject', 'date'], inplace=True) # df['title'] = df['title'].apply(preprocess_text) # df['text'] = df['text'].apply(preprocess_text) # df.to_csv('cleaned_news_data.csv', index=False) # df.dropna(inplace=True) data_path = "./data_2/WELFake_Dataset.csv" cleaned_path = f"./output/version_{version}/cleaned_news_data_{version}.csv" # Load data try: df = pd.read_csv(cleaned_path) df.dropna(inplace=True) print("Cleaned data found.") except: print("No cleaned data found. Cleaning data now...") df = pd.read_csv(data_path) # Drop index df.drop(df.columns[0], axis=1, inplace=True) df.dropna(inplace=True) # Swapping labels around since it originally is the opposite df["label"] = df["label"].map({0: 1, 1: 0}) df["title"] = df["title"].apply(preprocess_text) df["text"] = df["text"].apply(preprocess_text) # Create the directory if it does not exist os.makedirs(os.path.dirname(cleaned_path), exist_ok=True) df.to_csv(cleaned_path, index=False) print("Cleaned data saved.") # Splitting the data train_val, test = train_test_split(df, test_size=0.2, random_state=42) train, val = train_test_split( train_val, test_size=0.25, random_state=42 ) # 0.25 * 0.8 = 0.2 # Initialize the tokenizer tokenizer = Tokenizer() # Fit the tokenizer on the training data tokenizer.fit_on_texts(train["title"] + train["text"]) with open(f"./output/version_{version}/tokenizer_{version}.pickle", "wb") as handle: pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL) # Tokenize the data X_train_title = tokenizer.texts_to_sequences(train["title"]) X_train_text = tokenizer.texts_to_sequences(train["text"]) X_val_title = tokenizer.texts_to_sequences(val["title"]) X_val_text = tokenizer.texts_to_sequences(val["text"]) X_test_title = tokenizer.texts_to_sequences(test["title"]) X_test_text = tokenizer.texts_to_sequences(test["text"]) # Padding sequences max_length = 500 X_train_title = pad_sequences(X_train_title, maxlen=max_length) X_train_text = pad_sequences(X_train_text, maxlen=max_length) X_val_title = pad_sequences(X_val_title, maxlen=max_length) X_val_text = pad_sequences(X_val_text, maxlen=max_length) X_test_title = pad_sequences(X_test_title, maxlen=max_length) X_test_text = pad_sequences(X_test_text, maxlen=max_length) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Device: {device}") model = LSTMModel(len(tokenizer.word_index) + 1).to(device) # Convert data to PyTorch tensors train_data = NewsDataset( torch.tensor(X_train_title), torch.tensor(X_train_text), torch.tensor(train["label"].values), ) val_data = NewsDataset( torch.tensor(X_val_title), torch.tensor(X_val_text), torch.tensor(val["label"].values), ) test_data = NewsDataset( torch.tensor(X_test_title), torch.tensor(X_test_text), torch.tensor(test["label"].values), ) train_loader = DataLoader( train_data, batch_size=32, shuffle=True, num_workers=6, pin_memory=True, persistent_workers=True, ) val_loader = DataLoader( val_data, batch_size=32, shuffle=False, num_workers=6, pin_memory=True, persistent_workers=True, ) test_loader = DataLoader( test_data, batch_size=32, shuffle=False, num_workers=6, pin_memory=True, persistent_workers=True, ) criterion = nn.BCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) trained_model, best_accuracy, best_epoch = tr.train( model=model, train_loader=train_loader, val_loader=val_loader, criterion=criterion, optimizer=optimizer, version=version, epochs=10, device=device, max_grad_norm=1.0, early_stopping_patience=3, early_stopping_delta=0.01, ) print(f"Best model was saved at epoch: {best_epoch}") # Load the best model before testing best_model_path = f"./output/version_{version}/best_model_{version}.pth" model.load_state_dict(torch.load(best_model_path, map_location=device)) # Testing model.eval() true_labels = [] predicted_labels = [] predicted_probs = [] with torch.no_grad(): correct = 0 total = 0 for titles, texts, labels in test_loader: titles, texts, labels = ( titles.to(device), texts.to(device), labels.to(device).float(), ) outputs = model(titles, texts).squeeze() predicted = (outputs > 0.5).float() total += labels.size(0) correct += (predicted == labels).sum().item() true_labels.extend(labels.cpu().numpy()) predicted_labels.extend(predicted.cpu().numpy()) predicted_probs.extend(outputs.cpu().numpy()) test_accuracy = 100 * correct / total f1 = f1_score(true_labels, predicted_labels) auc_roc = roc_auc_score(true_labels, predicted_probs) print( f"Test Accuracy: {test_accuracy:.2f}%, F1 Score: {f1:.4f}, AUC-ROC: {auc_roc:.4f}" ) # Create DataFrame and Save to CSV confusion_data = pd.DataFrame({"True": true_labels, "Predicted": predicted_labels}) confusion_data.to_csv( f"./output/version_{version}/confusion_matrix_data_{version}.csv", index=False )