spamdet / spibx.py
manu1612's picture
Upload 5 files
884bd6d
import streamlit as st
import email
from email.policy import default
import re
import pickle
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
class HeaderAnalyzer:
def __init__(self):
pass
def analyze_header(self, header):
# Extract relevant information from the header
sender = header.get("From")
subject = header.get("Subject")
to = header.get("To")
date = header.get("Date")
# Extract other relevant fields as needed
# Apply rules or heuristics to analyze the header
spam_score = self.calculate_spam_score(header) # Calculate a spam score based on rules
# Return the analyzed information
return {
"sender": sender,
"subject": subject,
"to": to,
"date": date,
"spam_score": spam_score
# Include other analyzed information as needed
}
def calculate_spam_score(self, header):
# Apply rules or heuristics to calculate the spam score
spam_score = 0
# Example rules:
if header.get("X-Spam-Score"):
spam_score += float(header.get("X-Spam-Score"))
if header.get("X-Spam-Flag"):
spam_flag = header.get("X-Spam-Flag")
if spam_flag.lower() == "yes":
spam_score += 1
return spam_score
class SpamDetector:
def __init__(self):
# Load or initialize your spam detection models here
with open("verdict/email_subj_model.pkl", "rb") as f:
self.subj_model = pickle.load(f)
with open("verdict/phishing.pkl", "rb") as f:
self.url_model = pickle.load(f)
def predict_subject(self, subject):
# Preprocess the subject text
processed_subject = preprocess_subject(subject)
# Perform the spamminess prediction using the subject model
subject_prediction = self.subj_model.predict(processed_subject)
# Return the spamminess percentage or label for the subject
return subject_prediction
def predict_url(self, url):
# Preprocess the URL text
processed_url = preprocess_url(url)
# Perform the spamminess prediction using the URL model
url_prediction = self.url_model.predict(processed_url)
# Return the spamminess percentage or label for the URL
return url_prediction
def preprocess_subject(subject):
# Convert to lowercase
subject = subject.lower()
# Remove special characters and numbers
subject = re.sub(r"[^a-zA-Z]", " ", subject)
# Tokenize the subject
tokens = word_tokenize(subject)
# Remove stopwords
stop_words = set(stopwords.words("english"))
tokens = [token for token in tokens if token not in stop_words]
# Lemmatize the tokens
lemmatizer = WordNetLemmatizer()
tokens = [lemmatizer.lemmatize(token) for token in tokens]
# Join the tokens back into a string
processed_subject = " ".join(tokens)
return processed_subject
def preprocess_url(url):
# Convert to lowercase
url = url.lower()
# Remove special characters and numbers
url = re.sub(r"[^a-zA-Z0-9]", " ", url)
# Tokenize the URL
tokens = url.split()
# Remove stopwords
stop_words = set(stopwords.words("english"))
tokens = [token for token in tokens if token not in stop_words]
# Join the tokens back into a string
processed_url = " ".join(tokens)
return processed_url
def home(uploaded_file):
if uploaded_file:
st.header('Begin exploring the data using the menu on the left')
else:
st.header('To begin, please upload an EML file')
def extract_subject(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
subject = msg['Subject']
return subject
def extract_attachments(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
attachments = []
urls = []
for part in msg.iter_attachments():
filename = part.get_filename()
if filename:
attachments.append(filename)
if part.get_content_type().startswith("text/"):
content = part.get_content()
urls.extend(re.findall(r'(https?://\S+)', content))
return attachments, urls
def extract_headers(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
headers = {}
for key, value in msg.items():
headers[key] = value
return headers
# Add a title and intro text
st.title('Email Phishing Explorer')
st.text('This is a web app to allow exploration of phishing emails')
# Sidebar setup
st.sidebar.title('Sidebar')
upload_file = st.sidebar.file_uploader('Upload an EML file')
# Sidebar navigation
st.sidebar.title('Navigation')
options = st.sidebar.radio('Select what you want to display:',
['Home', 'Email Subject', 'Email Attachments', 'Email Headers'])
# Check if file has been uploaded
if upload_file is not None:
eml_path = 'uploaded.eml'
with open(eml_path, 'wb') as f:
f.write(upload_file.read())
# Create instances of HeaderAnalyzer and SpamDetector
header_analyzer = HeaderAnalyzer()
spam_detector = SpamDetector()
# Navigation options
if options == 'Home':
home(upload_file)
elif options == 'Email Subject':
if upload_file is not None:
subject = extract_subject(eml_path)
st.header('Email Subject Verdict')
st.write(f'Subject: {subject}')
subject_analysis = header_analyzer.analyze_header({'Subject': subject})
st.write(f'Spam Score: {subject_analysis["spam_score"]}')
else:
st.warning('Please upload an EML file first.')
elif options == 'Email Attachments':
if upload_file is not None:
attachments, urls = extract_attachments(eml_path)
st.header('Email Attachments Verdict')
if attachments:
st.write('Attachments:')
for attachment in attachments:
st.write(attachment)
else:
st.write('No attachments found.')
if urls:
st.write('URLs in Attachments:')
for url in urls:
st.write(url)
else:
st.write('No URLs found in attachments.')
else:
st.warning('Please upload an EML file first.')
elif options == 'Email Headers':
if upload_file is not None:
headers = extract_headers(eml_path)
st.header('Email Headers Verdict')
if headers:
st.write('Headers:')
header_analysis = header_analyzer.analyze_header(headers)
for key, value in headers.items():
st.write(f'{key}: {value}')
st.write(f'Spam Score: {header_analysis["spam_score"]}')
else:
st.write('No headers found.')
else:
st.warning('Please upload an EML file first.')