from fastapi import FastAPI, Request, Header from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel from typing import Optional import joblib import pandas as pd import random import uuid import nltk from nltk.tokenize import word_tokenize from nltk.stem import WordNetLemmatizer import re import os import json import uvicorn from langdetect import detect_langs, DetectorFactory, LangDetectException from langcodes import Language DetectorFactory.seed = 0 nltk.download("punkt") nltk.download("wordnet") app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Load model and vectorizer try: model = joblib.load("phishing_classifier.pkl") vectorizer = joblib.load("tfidf_vectorizer.pkl") print("Model and vectorizer loaded successfully.") except Exception as e: print(f"Error loading model/vectorizer: {e}") model = None vectorizer = None # Load quiz dataset try: dataset = pd.read_csv("full_test_predictions_cleaned.csv").dropna(subset=["Subject", "Body", "Label"]) dataset = dataset[["Subject", "Body", "Label"]] print("Dataset loaded successfully.") except Exception as e: print(f"Error loading dataset: {e}") dataset = None quiz_sessions = {} class EmailRequest(BaseModel): subject: str body: str play_game: Optional[bool] = False user_guess: Optional[str] = None def preprocess(text): text = re.sub(r"http\S+|www\S+|https\S+", "", text) text = re.sub(r"[^A-Za-z\s]", "", text) tokens = word_tokenize(text.lower(), preserve_line=True) lemmatizer = WordNetLemmatizer() clean_tokens = [lemmatizer.lemmatize(token) for token in tokens] return " ".join(clean_tokens) # Improved language detection def detect_language(text): words = text.strip().split() if len(words) < 4: return "Unknown" try: probs = detect_langs(text) top = probs[0] lang_code = top.lang confidence = top.prob if confidence >= 0.85: return Language.get(lang_code).display_name() elif len(words) >= 20: return Language.get(lang_code).display_name() + " (low confidence)" else: return "Unknown" except LangDetectException: return "Unknown" # Classification with safe-word patch def classify_email(subject, body): if model is None or vectorizer is None: raise ValueError("Model or vectorizer not loaded.") combined = f"{subject} {body}" combined_lower = combined.lower() processed = preprocess(combined) tokens = processed.split() if len(tokens) < 4: return False, 0.0 features = vectorizer.transform([processed]) score = model.predict_proba(features)[0][1] * 100 # Patch: lower score for invoice + attached if "invoice" in combined_lower and "attached" in combined_lower and score > 60: score -= 30 phishing = score >= 60 return phishing, round(score, 2) # Random quiz question from both phishing + non-phishing def get_quiz_question(): if dataset is None: return None sample = dataset.sample(1).iloc[0] label = str(sample["Label"]).strip().lower() raw_body = str(sample["Body"]).strip() clean_body = re.sub(r"(?i)(Message-ID|X-\w+|Date|To|From|Subject):.*", "", raw_body) clean_body = re.sub(r"\s+", " ", clean_body).strip() body = clean_body[:500] + "..." if len(clean_body) > 500 else clean_body if label in ["1", "phishing"]: label = "phishing" elif label in ["0", "not_phishing"]: label = "not_phishing" else: label = random.choice(["phishing", "not_phishing"]) # Fallback randomness return { "subject": str(sample["Subject"]) or "No Subject", "body": body, "label": label } def update_quiz_score(session_id, correct): if session_id not in quiz_sessions: quiz_sessions[session_id] = {"correct": 0, "total": 0} quiz_sessions[session_id]["total"] += 1 if correct: quiz_sessions[session_id]["correct"] += 1 @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/quiz", response_class=HTMLResponse) async def quiz_page(request: Request): return templates.TemplateResponse("quiz.html", {"request": request}) @app.post("/predict") async def predict_email(request: EmailRequest, session_id: Optional[str] = Header(None)): try: subject = request.subject body = request.body play_game = request.play_game user_guess = request.user_guess if not subject.strip() and not body.strip(): return JSONResponse(status_code=400, content={"error": "Email subject and body cannot be empty."}) phishing, score = classify_email(subject, body) # Tiered labeling if score >= 90: label = "Very High Risk Phishing" elif score >= 80: label = "High Risk Phishing" elif score >= 50: label = "Suspicious" else: label = "Non-Phishing" explanation = ( "Quack Quack! This email contains urgent language, financial threats, or suspicious links." if phishing else "Quack Quack! This email is likely safe but always verify sender details." ) tip = random.choice([ "Check email sender addresses carefully.", "Look for spelling mistakes in suspicious emails.", "Hover over links before clicking to check the destination.", "Never share your password with anyone.", "Report suspicious emails to your IT/security team.", "Verify attachments before downloading." ]) result = { "phishing": bool(phishing), "phishing_score": f"{int(score)}%" if score == int(score) else f"{score:.1f}%", "label": label, "explanation": explanation, "language": detect_language(f"{subject} {body}"), "did_you_know": tip, "quiz": None, "quiz_result": None, "session_id": None, "score": None } if play_game: session = session_id or str(uuid.uuid4()) question = get_quiz_question() result["quiz"] = question result["session_id"] = session if user_guess and question: correct = ( (user_guess.lower() == "yes" and question["label"] == "phishing") or (user_guess.lower() == "no" and question["label"] == "not_phishing") ) update_quiz_score(session, correct) result["quiz_result"] = ( "Quack Quack! Correct! You're learning each day." if correct else "Oops! That was incorrect. Stay sharp, detective!" ) result["score"] = quiz_sessions[session] return JSONResponse(content=result) except Exception as e: import traceback traceback.print_exc() return JSONResponse(status_code=500, content={"error": str(e)}) if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)