|
import streamlit as st |
|
import email |
|
from email.policy import default |
|
import re |
|
import pickle |
|
from nltk.tokenize import word_tokenize |
|
from nltk.corpus import stopwords |
|
from nltk.stem import WordNetLemmatizer |
|
|
|
|
|
|
|
class HeaderAnalyzer: |
|
def __init__(self): |
|
pass |
|
|
|
def analyze_header(self, header): |
|
|
|
sender = header.get("From") |
|
subject = header.get("Subject") |
|
to = header.get("To") |
|
date = header.get("Date") |
|
|
|
|
|
|
|
spam_score = self.calculate_spam_score(header) |
|
|
|
|
|
return { |
|
"sender": sender, |
|
"subject": subject, |
|
"to": to, |
|
"date": date, |
|
"spam_score": spam_score |
|
|
|
} |
|
|
|
def calculate_spam_score(self, header): |
|
|
|
spam_score = 0 |
|
|
|
|
|
if header.get("X-Spam-Score"): |
|
spam_score += float(header.get("X-Spam-Score")) |
|
if header.get("X-Spam-Flag"): |
|
spam_flag = header.get("X-Spam-Flag") |
|
if spam_flag.lower() == "yes": |
|
spam_score += 1 |
|
|
|
return spam_score |
|
|
|
|
|
class SpamDetector: |
|
def __init__(self): |
|
|
|
with open("verdict/email_subj_model.pkl", "rb") as f: |
|
self.subj_model = pickle.load(f) |
|
|
|
with open("verdict/phishing.pkl", "rb") as f: |
|
self.url_model = pickle.load(f) |
|
|
|
def predict_subject(self, subject): |
|
|
|
processed_subject = preprocess_subject(subject) |
|
|
|
|
|
subject_prediction = self.subj_model.predict(processed_subject) |
|
|
|
|
|
return subject_prediction |
|
|
|
def predict_url(self, url): |
|
|
|
processed_url = preprocess_url(url) |
|
|
|
|
|
url_prediction = self.url_model.predict(processed_url) |
|
|
|
|
|
return url_prediction |
|
|
|
def preprocess_subject(subject): |
|
|
|
subject = subject.lower() |
|
|
|
|
|
subject = re.sub(r"[^a-zA-Z]", " ", subject) |
|
|
|
|
|
tokens = word_tokenize(subject) |
|
|
|
|
|
stop_words = set(stopwords.words("english")) |
|
tokens = [token for token in tokens if token not in stop_words] |
|
|
|
|
|
lemmatizer = WordNetLemmatizer() |
|
tokens = [lemmatizer.lemmatize(token) for token in tokens] |
|
|
|
|
|
processed_subject = " ".join(tokens) |
|
|
|
return processed_subject |
|
|
|
|
|
def preprocess_url(url): |
|
|
|
url = url.lower() |
|
|
|
|
|
url = re.sub(r"[^a-zA-Z0-9]", " ", url) |
|
|
|
|
|
tokens = url.split() |
|
|
|
|
|
stop_words = set(stopwords.words("english")) |
|
tokens = [token for token in tokens if token not in stop_words] |
|
|
|
|
|
processed_url = " ".join(tokens) |
|
|
|
return processed_url |
|
|
|
|
|
|
|
def home(uploaded_file): |
|
if uploaded_file: |
|
st.header('Begin exploring the data using the menu on the left') |
|
else: |
|
st.header('To begin, please upload an EML file') |
|
|
|
|
|
def extract_subject(eml): |
|
with open(eml, 'rb') as f: |
|
msg = email.message_from_binary_file(f, policy=default) |
|
subject = msg['Subject'] |
|
return subject |
|
|
|
|
|
def extract_attachments(eml): |
|
with open(eml, 'rb') as f: |
|
msg = email.message_from_binary_file(f, policy=default) |
|
attachments = [] |
|
urls = [] |
|
for part in msg.iter_attachments(): |
|
filename = part.get_filename() |
|
if filename: |
|
attachments.append(filename) |
|
if part.get_content_type().startswith("text/"): |
|
content = part.get_content() |
|
urls.extend(re.findall(r'(https?://\S+)', content)) |
|
return attachments, urls |
|
|
|
|
|
def extract_headers(eml): |
|
with open(eml, 'rb') as f: |
|
msg = email.message_from_binary_file(f, policy=default) |
|
headers = {} |
|
for key, value in msg.items(): |
|
headers[key] = value |
|
return headers |
|
|
|
|
|
|
|
st.title('Email Phishing Explorer') |
|
st.text('This is a web app to allow exploration of phishing emails') |
|
|
|
|
|
st.sidebar.title('Sidebar') |
|
upload_file = st.sidebar.file_uploader('Upload an EML file') |
|
|
|
|
|
st.sidebar.title('Navigation') |
|
options = st.sidebar.radio('Select what you want to display:', |
|
['Home', 'Email Subject', 'Email Attachments', 'Email Headers']) |
|
|
|
|
|
if upload_file is not None: |
|
eml_path = 'uploaded.eml' |
|
with open(eml_path, 'wb') as f: |
|
f.write(upload_file.read()) |
|
|
|
|
|
header_analyzer = HeaderAnalyzer() |
|
spam_detector = SpamDetector() |
|
|
|
|
|
if options == 'Home': |
|
home(upload_file) |
|
elif options == 'Email Subject': |
|
if upload_file is not None: |
|
subject = extract_subject(eml_path) |
|
st.header('Email Subject Verdict') |
|
st.write(f'Subject: {subject}') |
|
subject_analysis = header_analyzer.analyze_header({'Subject': subject}) |
|
st.write(f'Spam Score: {subject_analysis["spam_score"]}') |
|
else: |
|
st.warning('Please upload an EML file first.') |
|
elif options == 'Email Attachments': |
|
if upload_file is not None: |
|
attachments, urls = extract_attachments(eml_path) |
|
st.header('Email Attachments Verdict') |
|
if attachments: |
|
st.write('Attachments:') |
|
for attachment in attachments: |
|
st.write(attachment) |
|
else: |
|
st.write('No attachments found.') |
|
|
|
if urls: |
|
st.write('URLs in Attachments:') |
|
for url in urls: |
|
st.write(url) |
|
else: |
|
st.write('No URLs found in attachments.') |
|
else: |
|
st.warning('Please upload an EML file first.') |
|
|
|
elif options == 'Email Headers': |
|
if upload_file is not None: |
|
headers = extract_headers(eml_path) |
|
st.header('Email Headers Verdict') |
|
if headers: |
|
st.write('Headers:') |
|
header_analysis = header_analyzer.analyze_header(headers) |
|
for key, value in headers.items(): |
|
st.write(f'{key}: {value}') |
|
st.write(f'Spam Score: {header_analysis["spam_score"]}') |
|
else: |
|
st.write('No headers found.') |
|
else: |
|
st.warning('Please upload an EML file first.') |
|
|