File size: 7,093 Bytes
884bd6d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import streamlit as st
import email
from email.policy import default
import re
import pickle
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
class HeaderAnalyzer:
def __init__(self):
pass
def analyze_header(self, header):
# Extract relevant information from the header
sender = header.get("From")
subject = header.get("Subject")
to = header.get("To")
date = header.get("Date")
# Extract other relevant fields as needed
# Apply rules or heuristics to analyze the header
spam_score = self.calculate_spam_score(header) # Calculate a spam score based on rules
# Return the analyzed information
return {
"sender": sender,
"subject": subject,
"to": to,
"date": date,
"spam_score": spam_score
# Include other analyzed information as needed
}
def calculate_spam_score(self, header):
# Apply rules or heuristics to calculate the spam score
spam_score = 0
# Example rules:
if header.get("X-Spam-Score"):
spam_score += float(header.get("X-Spam-Score"))
if header.get("X-Spam-Flag"):
spam_flag = header.get("X-Spam-Flag")
if spam_flag.lower() == "yes":
spam_score += 1
return spam_score
class SpamDetector:
def __init__(self):
# Load or initialize your spam detection models here
with open("verdict/email_subj_model.pkl", "rb") as f:
self.subj_model = pickle.load(f)
with open("verdict/phishing.pkl", "rb") as f:
self.url_model = pickle.load(f)
def predict_subject(self, subject):
# Preprocess the subject text
processed_subject = preprocess_subject(subject)
# Perform the spamminess prediction using the subject model
subject_prediction = self.subj_model.predict(processed_subject)
# Return the spamminess percentage or label for the subject
return subject_prediction
def predict_url(self, url):
# Preprocess the URL text
processed_url = preprocess_url(url)
# Perform the spamminess prediction using the URL model
url_prediction = self.url_model.predict(processed_url)
# Return the spamminess percentage or label for the URL
return url_prediction
def preprocess_subject(subject):
# Convert to lowercase
subject = subject.lower()
# Remove special characters and numbers
subject = re.sub(r"[^a-zA-Z]", " ", subject)
# Tokenize the subject
tokens = word_tokenize(subject)
# Remove stopwords
stop_words = set(stopwords.words("english"))
tokens = [token for token in tokens if token not in stop_words]
# Lemmatize the tokens
lemmatizer = WordNetLemmatizer()
tokens = [lemmatizer.lemmatize(token) for token in tokens]
# Join the tokens back into a string
processed_subject = " ".join(tokens)
return processed_subject
def preprocess_url(url):
# Convert to lowercase
url = url.lower()
# Remove special characters and numbers
url = re.sub(r"[^a-zA-Z0-9]", " ", url)
# Tokenize the URL
tokens = url.split()
# Remove stopwords
stop_words = set(stopwords.words("english"))
tokens = [token for token in tokens if token not in stop_words]
# Join the tokens back into a string
processed_url = " ".join(tokens)
return processed_url
def home(uploaded_file):
if uploaded_file:
st.header('Begin exploring the data using the menu on the left')
else:
st.header('To begin, please upload an EML file')
def extract_subject(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
subject = msg['Subject']
return subject
def extract_attachments(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
attachments = []
urls = []
for part in msg.iter_attachments():
filename = part.get_filename()
if filename:
attachments.append(filename)
if part.get_content_type().startswith("text/"):
content = part.get_content()
urls.extend(re.findall(r'(https?://\S+)', content))
return attachments, urls
def extract_headers(eml):
with open(eml, 'rb') as f:
msg = email.message_from_binary_file(f, policy=default)
headers = {}
for key, value in msg.items():
headers[key] = value
return headers
# Add a title and intro text
st.title('Email Phishing Explorer')
st.text('This is a web app to allow exploration of phishing emails')
# Sidebar setup
st.sidebar.title('Sidebar')
upload_file = st.sidebar.file_uploader('Upload an EML file')
# Sidebar navigation
st.sidebar.title('Navigation')
options = st.sidebar.radio('Select what you want to display:',
['Home', 'Email Subject', 'Email Attachments', 'Email Headers'])
# Check if file has been uploaded
if upload_file is not None:
eml_path = 'uploaded.eml'
with open(eml_path, 'wb') as f:
f.write(upload_file.read())
# Create instances of HeaderAnalyzer and SpamDetector
header_analyzer = HeaderAnalyzer()
spam_detector = SpamDetector()
# Navigation options
if options == 'Home':
home(upload_file)
elif options == 'Email Subject':
if upload_file is not None:
subject = extract_subject(eml_path)
st.header('Email Subject Verdict')
st.write(f'Subject: {subject}')
subject_analysis = header_analyzer.analyze_header({'Subject': subject})
st.write(f'Spam Score: {subject_analysis["spam_score"]}')
else:
st.warning('Please upload an EML file first.')
elif options == 'Email Attachments':
if upload_file is not None:
attachments, urls = extract_attachments(eml_path)
st.header('Email Attachments Verdict')
if attachments:
st.write('Attachments:')
for attachment in attachments:
st.write(attachment)
else:
st.write('No attachments found.')
if urls:
st.write('URLs in Attachments:')
for url in urls:
st.write(url)
else:
st.write('No URLs found in attachments.')
else:
st.warning('Please upload an EML file first.')
elif options == 'Email Headers':
if upload_file is not None:
headers = extract_headers(eml_path)
st.header('Email Headers Verdict')
if headers:
st.write('Headers:')
header_analysis = header_analyzer.analyze_header(headers)
for key, value in headers.items():
st.write(f'{key}: {value}')
st.write(f'Spam Score: {header_analysis["spam_score"]}')
else:
st.write('No headers found.')
else:
st.warning('Please upload an EML file first.')
|