| |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| import torch.nn.functional as F |
| from torch.utils.data import Dataset, DataLoader |
| from transformers import DistilBertTokenizer, DistilBertForSequenceClassification |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import classification_report |
| from transformers import BertTokenizer |
|
|
|
|
| |
| file_path = 'spam_ham_dataset.csv' |
| df = pd.read_csv(file_path) |
|
|
| |
| df['label_num'] = df['label'].map({'ham': 0, 'spam': 1}) |
|
|
| |
| tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased') |
|
|
| |
| encodings = tokenizer(df['text'].tolist(), padding=True, truncation=True, max_length=128, return_tensors="pt") |
| labels = torch.tensor(df['label_num'].values) |
|
|
| |
| class SpamDataset(Dataset): |
| def __init__(self, encodings, labels): |
| self.encodings = encodings |
| self.labels = labels |
|
|
| def __len__(self): |
| return len(self.labels) |
|
|
| def __getitem__(self, idx): |
| item = {key: val[idx] for key, val in self.encodings.items()} |
| item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long) |
| return item |
|
|
| |
| dataset = SpamDataset(encodings, labels) |
|
|
| |
| train_size = int(0.8 * len(dataset)) |
| val_size = len(dataset) - train_size |
| train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size]) |
|
|
| |
| def collate_fn(batch): |
| keys = batch[0].keys() |
| return {key: torch.stack([b[key] for b in batch]) for key in keys} |
|
|
| train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn) |
| val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn) |
|
|
| |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2) |
| model.to(device) |
|
|
| |
| optimizer = optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.01) |
| loss_fn = nn.CrossEntropyLoss() |
|
|
| |
| EPOCHS = 10 |
| for epoch in range(EPOCHS): |
| model.train() |
| total_loss = 0 |
|
|
| for batch in train_loader: |
| optimizer.zero_grad() |
|
|
| inputs = {key: val.to(device) for key, val in batch.items()} |
| labels = inputs.pop("labels").to(device) |
|
|
| outputs = model(**inputs) |
| loss = loss_fn(outputs.logits, labels) |
|
|
| loss.backward() |
| optimizer.step() |
|
|
| total_loss += loss.item() |
|
|
| avg_loss = total_loss / len(train_loader) |
| print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}") |
|
|
| |
| torch.save(model.state_dict(), "distilbert_spam_model.pt") |
|
|
| |
| model.eval() |
| correct = 0 |
| total = 0 |
| with torch.no_grad(): |
| for batch in val_loader: |
| inputs = {key: val.to(device) for key, val in batch.items()} |
| labels = inputs.pop("labels").to(device) |
|
|
| outputs = model(**inputs) |
| predictions = torch.argmax(outputs.logits, dim=1) |
| correct += (predictions == labels).sum().item() |
| total += labels.size(0) |
|
|
| accuracy = correct / total |
| print(f"Validation Accuracy: {accuracy:.4f}") |
|
|
|
|
|
|
| |
| def classify_email(email_text): |
| model.eval() |
|
|
| with torch.no_grad(): |
| |
| inputs = tokenizer(email_text, padding=True, truncation=True, max_length=256, return_tensors="pt") |
|
|
| |
| inputs = {key: val.to(device) for key, val in inputs.items()} |
|
|
| |
| outputs = model(**inputs) |
| logits = outputs.logits |
|
|
| |
| predictions = torch.argmax(logits, dim=1) |
|
|
| |
| probs = F.softmax(logits, dim=1) |
| confidence = torch.max(probs).item() * 100 |
|
|
| |
| result = "Spam" if predictions.item() == 1 else "Ham" |
|
|
| return { |
| "result": result, |
| "confidence": f"{confidence:.2f}%", |
| } |
|
|
| |
| def evaluate_model_with_report(val_loader): |
| model.eval() |
| y_true = [] |
| y_pred = [] |
| correct = 0 |
| total = 0 |
|
|
| with torch.no_grad(): |
| for batch in val_loader: |
| inputs = {key: val.to(device) for key, val in batch.items()} |
| labels = inputs.pop("labels").to(device) |
|
|
| outputs = model(**inputs) |
| predictions = torch.argmax(outputs.logits, dim=1) |
|
|
| |
| y_true.extend(labels.cpu().numpy()) |
| y_pred.extend(predictions.cpu().numpy()) |
|
|
| |
| correct += (predictions == labels).sum().item() |
| total += labels.size(0) |
|
|
| |
| accuracy = correct / total if total > 0 else 0 |
| print(f"Validation Accuracy: {accuracy:.4f}") |
|
|
| |
| print("\nClassification Report:") |
| print(classification_report(y_true, y_pred, target_names=["Ham", "Spam"])) |
|
|
| return accuracy |
|
|
| |
| accuracy = evaluate_model_with_report(val_loader) |
| print(f"Model Validation Accuracy: {accuracy:.4f}") |
|
|
| |
|
|
| import gradio as gr |
|
|
| |
| def create_interface(): |
| performance_metrics = generate_performance_metrics() |
|
|
| |
| with gr.Blocks(css=custom_css) as interface: |
| gr.Markdown("Spam Email Classification") |
| gr.Markdown( |
| """ |
| Brief description of the project here |
| |
| """ |
| ) |
|
|
| |
| with gr.Row(): |
| email_input = gr.Textbox( |
| lines=8, placeholder="Type or paste your email content here...", label="Email Content" |
| ) |
|
|
| |
| with gr.Row(): |
| result_output = gr.HTML(label="Classification Result") |
| confidence_output = gr.Textbox(label="Confidence Score", interactive=False) |
| accuracy_output = gr.Textbox(label="Accuracy", interactive=False) |
|
|
|
|
| analyze_button = gr.Button("Analyze Email 🕵️♂️") |
|
|
| analyze_button.click( |
| fn=email_analysis_pipeline, |
| inputs=email_input, |
| outputs=[result_output, confidence_output, accuracy_output] |
| ) |
|
|
| |
| gr.Markdown("## 📊 Model Performance Analytics") |
| with gr.Row(): |
| with gr.Column(): |
| gr.Textbox(value=performance_metrics["accuracy"], label="Accuracy", interactive=False, elem_classes=["metric"]) |
| gr.Textbox(value=performance_metrics["precision"], label="Precision", interactive=False, elem_classes=["metric"]) |
| gr.Textbox(value=performance_metrics["recall"], label="Recall", interactive=False, elem_classes=["metric"]) |
| gr.Textbox(value=performance_metrics["f1_score"], label="F1 Score", interactive=False, elem_classes=["metric"]) |
| with gr.Column(): |
| gr.Markdown("### Confusion Matrix") |
| gr.HTML(f"<img src='data:image/png;base64,{performance_metrics['confusion_matrix_plot']}' style='max-width: 100%; height: auto;' />") |
|
|
| gr.Markdown("## 📘 Glossary and Explanation of Labels") |
| gr.Markdown( |
| """ |
| ### Labels: |
| - **Spam:** Unwanted or harmful emails flagged by the system. |
| - **Ham:** Legitimate, safe emails. |
| |
| ### Metrics: |
| - **Accuracy:** The percentage of correct classifications. |
| - **Precision:** Out of predicted Spam, how many are actually Spam. |
| - **Recall:** Out of all actual Spam emails, how many are predicted as Spam. |
| - **F1 Score:** Harmonic mean of Precision and Recall. |
| """ |
| ) |
|
|
| return interface |
|
|
| |
| interface = create_interface() |
| interface.launch(share=True) |
|
|