first-attempt / feature_extraction.py
FredZhang7's picture
Update feature_extraction.py
77efd27
import os
import dns.resolver, dns.rdatatype
import requests
from bs4 import BeautifulSoup
from collections import Counter
import whois
from datetime import datetime
import time
import csv
import ssl
import socket
from urllib.request import urlparse
import OpenSSL.crypto
import pandas as pd
import random
def generate_user_agent() -> str:
a = random.randint(63, 89)
b = random.randint(1, 3200)
c = random.randint(1, 140)
user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{a}.0.{b}.{c} Safari/537.3'
return user_agent
headers = {
'User-Agent': generate_user_agent()
}
def count_domain_occurrences(soup: BeautifulSoup, domain: str) -> int:
"""
Returns the number of occurrences of the domain in the website's page source.
"""
try:
domain_count = soup.prettify().count(domain)
return domain_count
except Exception as e:
print(f"count_domain_occurrences: {str(e)}")
return 0
existing_isssuers = {"ACCVCA-120", "AT", "AU", "Apple Public Server ECC CA 12 - G1", "Apple Public Server RSA CA 12 - G1", "Atos TrustedRoot Server-CA 2019", "BE", "BM", "BR", "CH", "CN", "CZ", "DE", "ES", "FI", "FR", "GB", "GR", "HK", "HU", "IN", "IT", "JP", "LU", "LV", "MA", "NL", "NO", "PL", "PT", "RO", "SK", "TR", "TW", "US", "UY"}
def get_certificate_info(url: str) -> tuple[str, int]:
"""
Returns the issuer and age of the certificate if found. None, None otherwise
"""
try:
if not url.startswith("https://"):
raise ValueError("URL must use HTTPS protocol")
hostname = url.split("https://")[1].split("/")[0]
ip_addresses = socket.getaddrinfo(hostname, 443)
ip_address = ip_addresses[0][4][0]
context = ssl.create_default_context()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssl_conn = context.wrap_socket(sock, server_hostname=hostname)
ssl_conn.connect((ip_address, 443))
cert = ssl_conn.getpeercert()
if 'notAfter' not in cert:
raise ValueError("Certificate information not found")
issuer = cert['issuer'][0][0][1]
not_after = cert['notAfter']
not_after_date = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z')
certificate_age = (datetime.now() - not_after_date).days
if issuer not in existing_isssuers:
issuer = 'other'
return issuer, certificate_age
except Exception as e:
print(f"get_certificate_info error: {str(e)}")
return None, None
def age_of_domain(w: whois.WhoisEntry) -> int:
"""
Returns the age of domain in days, None if error
"""
try:
creation_date = w.creation_date
if creation_date is None:
# Domain creation date is not available, try using updated_date as a fallback
updated_date = w.updated_date
if updated_date is None:
return -1
if type(updated_date) == list:
creation_date = min(updated_date)
else:
creation_date = updated_date
if type(creation_date) == list:
creation_date = min(creation_date)
num_days = (datetime.now() - creation_date).days
return num_days
except Exception as e:
print('age_of_domain error: ' + str(e))
return None
def abnormal_url(url: str, w: whois.WhoisEntry) -> int:
"""
Returns 1 if the hostname is not in the URL, 0 otherwise.
"""
host_name = w.domain.split('.')[0]
if host_name not in url:
return 1
else:
return 0
def dns_record(domain: str) -> tuple[int, int, int]:
"""
Returns TTL, IP address count presence in a tuple of integers.
Returns None, None, None if dns record not found.
"""
try:
answers = dns.resolver.resolve(domain)
TTL = answers.rrset.ttl
IP_addresses = len(answers)
return TTL, IP_addresses
except dns.resolver.NXDOMAIN:
return None, None
except Exception as e:
print(f"dns_record error: {str(e)}")
return None, None
def script_link_percentage(soup: BeautifulSoup) -> tuple[float, float]:
"""
Returns the percentage of meta, script, and link tags that have a link
"""
script_tags = soup.find_all('script')
link_tags = soup.find_all('link')
script_links = sum([1 for tag in script_tags if tag.has_attr('src')])
link_links = sum([1 for tag in link_tags if tag.has_attr('href')])
total_links = script_links + link_links
if total_links == 0:
return 0, 0, 0
script_percentage = (script_links / total_links)
link_percentage = (link_links / total_links)
return script_percentage, link_percentage
def request_url_percentage(soup: BeautifulSoup, domain: str) -> float:
"""
Returns the percentage of external domains in the URL
"""
links = [link.get('href') for link in soup.find_all('a')]
images = [img.get('src') for img in soup.find_all('img')]
videos = [video.get('src') for video in soup.find_all('video')]
sounds = [sound.get('src') for sound in soup.find_all('audio')]
external_links = []
for link in links + images + videos + sounds:
if link is None:
continue
parsed_domain = urlparse(link).netloc
if parsed_domain != '' and parsed_domain != domain:
external_links.append(link)
external_domains = [urlparse(link).netloc for link in external_links]
domain_counts = Counter(external_domains)
total_links = len(external_domains)
if total_links == 0:
return 1
external_links_count = domain_counts[domain]
return (external_links_count / total_links)
def domain_registeration_length(w: whois.WhoisEntry) -> int:
""""
Returns the number of days since the domain was registered, None if error
"""
try:
expiration_date = w.expiration_date
if type(expiration_date) == list:
expiration_date = expiration_date[0]
if expiration_date is not None:
time_to_expire = (expiration_date - datetime.now()).days
return time_to_expire
else:
return 0
except Exception as e:
print('domain_registeration_length error: ' + str(e))
return None
def make_request(url: str, headers: dict, timeout: int, retries: int) -> requests.Response:
for i in range(retries):
try:
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
retry_delay = 2**i
print(f'\033[34mRequestException for {url}: {e}. Retrying in {retry_delay} seconds...\033[0m')
time.sleep(retry_delay)
except Exception as e:
print(f'\033[31mError making request for {url}: {e}\033[0m')
return None
print(f'\033[31mFailed to make request after {retries} retries.\033[0m')
return None
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained("FredZhang7/malphish-eater-v1")
model = AutoModelForSequenceClassification.from_pretrained("FredZhang7/malphish-eater-v1")
model.eval()
def predict_url_batch(urls: list[str]) -> list[float]:
"""Return the confidence of the urls being malicious."""
encodings = tokenizer.batch_encode_plus(
urls,
add_special_tokens=True,
padding="max_length",
truncation=True,
max_length=208,
return_tensors="pt"
)
input_ids = encodings["input_ids"].to(device)
attention_mask = encodings["attention_mask"].to(device)
outputs = model(input_ids, attention_mask=attention_mask)[0]
probs = torch.nn.functional.softmax(outputs, dim=1)
confidences = probs[:, 1].tolist()
return confidences
url = os.environ['PAGE_RANK_URL']
api_key = os.environ['PAGE_RANK_KEY']
def page_ranks(urls: list[str]) -> list[float]:
"""
Returns the page ranks of the urls in a list of floats
"""
if not urls:
return []
urls = [urlparse(url).netloc for url in urls]
page_ranks = []
chunk_size = 100
for i in range(0, len(urls), chunk_size):
start_time = time.time()
params = {
'domains[]': urls[i:i+chunk_size]
}
headers = {
'API-OPR': api_key
}
response = requests.get(url, params=params, headers=headers)
output = response.json()
if response.status_code == 200:
for obj in output['response']:
if obj['status_code'] == 200:
page_ranks.append(obj['page_rank_decimal'])
else:
page_ranks.append(None)
else:
print("\033[91m")
print("Error: " + str(response.status_code))
print(response.text)
print("\033[0m")
print(f"Chunk {i//chunk_size + 1} of {len(urls)//chunk_size + 1} done in {time.time() - start_time} seconds.")
return page_ranks
def extract_features(urls: list[str]) -> pd.DataFrame:
df = pd.DataFrame(columns=['issuer', 'certificate_age', 'request_url_percentage', 'script_percentage', 'link_percentage', 'TTL', 'ip_address_count', 'count_domain_occurrences', 'domain_registration_length', 'abnormal_url', 'age_of_domain', 'page_rank_decimal', 'bert_confidence'])
bert_confidences = []
for i in range(0, len(urls), 64):
batch_urls = urls[i:i+64]
batch_confidences = predict_url_batch(batch_urls)
bert_confidences.extend(batch_confidences)
page_rank_decimals = page_ranks(urls)
if len(page_rank_decimals) != len(urls):
raise ValueError("Page ranks and urls must be the same length")
for i in range(len(urls)):
url = urls[i]
response = make_request(url, headers, timeout=10, retries=2)
if response is None:
continue
issuer, certificate_age = get_certificate_info(url)
try:
soup = BeautifulSoup(response.content, 'html.parser')
script_percentage, link_percentage = script_link_percentage(soup)
except Exception as e:
print('soup error, double check your code: ' + str(e))
continue
try:
parsed_url = urlparse(url)
domain = parsed_url.netloc
request_url_percentage_value = request_url_percentage(soup, domain)
TTL, ip_address_count = dns_record(domain)
count_domain_occurrences_value = count_domain_occurrences(soup, domain)
except Exception as e:
print('urlparse error, double check your code: ' + str(e))
continue
try:
w = whois.whois(domain)
domain_registeration_length_value = domain_registeration_length(w)
abnormal_url_value = abnormal_url(url, w)
age_of_domain_value = age_of_domain(w)
except Exception as e:
print('whois error: ' + str(e))
domain_registeration_length_value = None
abnormal_url_value = None
age_of_domain_value = None
row = [issuer, certificate_age, request_url_percentage_value, script_percentage, link_percentage, TTL, ip_address_count, count_domain_occurrences_value, domain_registeration_length_value, abnormal_url_value, age_of_domain_value, page_rank_decimals[i], bert_confidences[i]]
df.loc[len(df)] = row
return df