Spaces:
Runtime error
Runtime error
import os | |
import dns.resolver, dns.rdatatype | |
import requests | |
from bs4 import BeautifulSoup | |
from collections import Counter | |
import whois | |
from datetime import datetime | |
import time | |
import csv | |
import ssl | |
import socket | |
from urllib.request import urlparse | |
import OpenSSL.crypto | |
import pandas as pd | |
import random | |
def generate_user_agent() -> str: | |
a = random.randint(63, 89) | |
b = random.randint(1, 3200) | |
c = random.randint(1, 140) | |
user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{a}.0.{b}.{c} Safari/537.3' | |
return user_agent | |
headers = { | |
'User-Agent': generate_user_agent() | |
} | |
def count_domain_occurrences(soup: BeautifulSoup, domain: str) -> int: | |
""" | |
Returns the number of occurrences of the domain in the website's page source. | |
""" | |
try: | |
domain_count = soup.prettify().count(domain) | |
return domain_count | |
except Exception as e: | |
print(f"count_domain_occurrences: {str(e)}") | |
return 0 | |
existing_isssuers = {"ACCVCA-120", "AT", "AU", "Apple Public Server ECC CA 12 - G1", "Apple Public Server RSA CA 12 - G1", "Atos TrustedRoot Server-CA 2019", "BE", "BM", "BR", "CH", "CN", "CZ", "DE", "ES", "FI", "FR", "GB", "GR", "HK", "HU", "IN", "IT", "JP", "LU", "LV", "MA", "NL", "NO", "PL", "PT", "RO", "SK", "TR", "TW", "US", "UY"} | |
def get_certificate_info(url: str) -> tuple[str, int]: | |
""" | |
Returns the issuer and age of the certificate if found. None, None otherwise | |
""" | |
try: | |
if not url.startswith("https://"): | |
raise ValueError("URL must use HTTPS protocol") | |
hostname = url.split("https://")[1].split("/")[0] | |
ip_addresses = socket.getaddrinfo(hostname, 443) | |
ip_address = ip_addresses[0][4][0] | |
context = ssl.create_default_context() | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
ssl_conn = context.wrap_socket(sock, server_hostname=hostname) | |
ssl_conn.connect((ip_address, 443)) | |
cert = ssl_conn.getpeercert() | |
if 'notAfter' not in cert: | |
raise ValueError("Certificate information not found") | |
issuer = cert['issuer'][0][0][1] | |
not_after = cert['notAfter'] | |
not_after_date = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z') | |
certificate_age = (datetime.now() - not_after_date).days | |
if issuer not in existing_isssuers: | |
issuer = 'other' | |
return issuer, certificate_age | |
except Exception as e: | |
print(f"get_certificate_info error: {str(e)}") | |
return None, None | |
def age_of_domain(w: whois.WhoisEntry) -> int: | |
""" | |
Returns the age of domain in days, None if error | |
""" | |
try: | |
creation_date = w.creation_date | |
if creation_date is None: | |
# Domain creation date is not available, try using updated_date as a fallback | |
updated_date = w.updated_date | |
if updated_date is None: | |
return -1 | |
if type(updated_date) == list: | |
creation_date = min(updated_date) | |
else: | |
creation_date = updated_date | |
if type(creation_date) == list: | |
creation_date = min(creation_date) | |
num_days = (datetime.now() - creation_date).days | |
return num_days | |
except Exception as e: | |
print('age_of_domain error: ' + str(e)) | |
return None | |
def abnormal_url(url: str, w: whois.WhoisEntry) -> int: | |
""" | |
Returns 1 if the hostname is not in the URL, 0 otherwise. | |
""" | |
host_name = w.domain.split('.')[0] | |
if host_name not in url: | |
return 1 | |
else: | |
return 0 | |
def dns_record(domain: str) -> tuple[int, int, int]: | |
""" | |
Returns TTL, IP address count presence in a tuple of integers. | |
Returns None, None, None if dns record not found. | |
""" | |
try: | |
answers = dns.resolver.resolve(domain) | |
TTL = answers.rrset.ttl | |
IP_addresses = len(answers) | |
return TTL, IP_addresses | |
except dns.resolver.NXDOMAIN: | |
return None, None | |
except Exception as e: | |
print(f"dns_record error: {str(e)}") | |
return None, None | |
def script_link_percentage(soup: BeautifulSoup) -> tuple[float, float]: | |
""" | |
Returns the percentage of meta, script, and link tags that have a link | |
""" | |
script_tags = soup.find_all('script') | |
link_tags = soup.find_all('link') | |
script_links = sum([1 for tag in script_tags if tag.has_attr('src')]) | |
link_links = sum([1 for tag in link_tags if tag.has_attr('href')]) | |
total_links = script_links + link_links | |
if total_links == 0: | |
return 0, 0, 0 | |
script_percentage = (script_links / total_links) | |
link_percentage = (link_links / total_links) | |
return script_percentage, link_percentage | |
def request_url_percentage(soup: BeautifulSoup, domain: str) -> float: | |
""" | |
Returns the percentage of external domains in the URL | |
""" | |
links = [link.get('href') for link in soup.find_all('a')] | |
images = [img.get('src') for img in soup.find_all('img')] | |
videos = [video.get('src') for video in soup.find_all('video')] | |
sounds = [sound.get('src') for sound in soup.find_all('audio')] | |
external_links = [] | |
for link in links + images + videos + sounds: | |
if link is None: | |
continue | |
parsed_domain = urlparse(link).netloc | |
if parsed_domain != '' and parsed_domain != domain: | |
external_links.append(link) | |
external_domains = [urlparse(link).netloc for link in external_links] | |
domain_counts = Counter(external_domains) | |
total_links = len(external_domains) | |
if total_links == 0: | |
return 1 | |
external_links_count = domain_counts[domain] | |
return (external_links_count / total_links) | |
def domain_registeration_length(w: whois.WhoisEntry) -> int: | |
"""" | |
Returns the number of days since the domain was registered, None if error | |
""" | |
try: | |
expiration_date = w.expiration_date | |
if type(expiration_date) == list: | |
expiration_date = expiration_date[0] | |
if expiration_date is not None: | |
time_to_expire = (expiration_date - datetime.now()).days | |
return time_to_expire | |
else: | |
return 0 | |
except Exception as e: | |
print('domain_registeration_length error: ' + str(e)) | |
return None | |
def make_request(url: str, headers: dict, timeout: int, retries: int) -> requests.Response: | |
for i in range(retries): | |
try: | |
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True) | |
response.raise_for_status() | |
return response | |
except requests.exceptions.RequestException as e: | |
retry_delay = 2**i | |
print(f'\033[34mRequestException for {url}: {e}. Retrying in {retry_delay} seconds...\033[0m') | |
time.sleep(retry_delay) | |
except Exception as e: | |
print(f'\033[31mError making request for {url}: {e}\033[0m') | |
return None | |
print(f'\033[31mFailed to make request after {retries} retries.\033[0m') | |
return None | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
import torch | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tokenizer = AutoTokenizer.from_pretrained("FredZhang7/malphish-eater-v1") | |
model = AutoModelForSequenceClassification.from_pretrained("FredZhang7/malphish-eater-v1") | |
model.eval() | |
def predict_url_batch(urls: list[str]) -> list[float]: | |
"""Return the confidence of the urls being malicious.""" | |
encodings = tokenizer.batch_encode_plus( | |
urls, | |
add_special_tokens=True, | |
padding="max_length", | |
truncation=True, | |
max_length=208, | |
return_tensors="pt" | |
) | |
input_ids = encodings["input_ids"].to(device) | |
attention_mask = encodings["attention_mask"].to(device) | |
outputs = model(input_ids, attention_mask=attention_mask)[0] | |
probs = torch.nn.functional.softmax(outputs, dim=1) | |
confidences = probs[:, 1].tolist() | |
return confidences | |
url = os.environ['PAGE_RANK_URL'] | |
api_key = os.environ['PAGE_RANK_KEY'] | |
def page_ranks(urls: list[str]) -> list[float]: | |
""" | |
Returns the page ranks of the urls in a list of floats | |
""" | |
if not urls: | |
return [] | |
urls = [urlparse(url).netloc for url in urls] | |
page_ranks = [] | |
chunk_size = 100 | |
for i in range(0, len(urls), chunk_size): | |
start_time = time.time() | |
params = { | |
'domains[]': urls[i:i+chunk_size] | |
} | |
headers = { | |
'API-OPR': api_key | |
} | |
response = requests.get(url, params=params, headers=headers) | |
output = response.json() | |
if response.status_code == 200: | |
for obj in output['response']: | |
if obj['status_code'] == 200: | |
page_ranks.append(obj['page_rank_decimal']) | |
else: | |
page_ranks.append(None) | |
else: | |
print("\033[91m") | |
print("Error: " + str(response.status_code)) | |
print(response.text) | |
print("\033[0m") | |
print(f"Chunk {i//chunk_size + 1} of {len(urls)//chunk_size + 1} done in {time.time() - start_time} seconds.") | |
return page_ranks | |
def extract_features(urls: list[str]) -> pd.DataFrame: | |
df = pd.DataFrame(columns=['issuer', 'certificate_age', 'request_url_percentage', 'script_percentage', 'link_percentage', 'TTL', 'ip_address_count', 'count_domain_occurrences', 'domain_registration_length', 'abnormal_url', 'age_of_domain', 'page_rank_decimal', 'bert_confidence']) | |
bert_confidences = [] | |
for i in range(0, len(urls), 64): | |
batch_urls = urls[i:i+64] | |
batch_confidences = predict_url_batch(batch_urls) | |
bert_confidences.extend(batch_confidences) | |
page_rank_decimals = page_ranks(urls) | |
if len(page_rank_decimals) != len(urls): | |
raise ValueError("Page ranks and urls must be the same length") | |
for i in range(len(urls)): | |
url = urls[i] | |
response = make_request(url, headers, timeout=10, retries=2) | |
if response is None: | |
continue | |
issuer, certificate_age = get_certificate_info(url) | |
try: | |
soup = BeautifulSoup(response.content, 'html.parser') | |
script_percentage, link_percentage = script_link_percentage(soup) | |
except Exception as e: | |
print('soup error, double check your code: ' + str(e)) | |
continue | |
try: | |
parsed_url = urlparse(url) | |
domain = parsed_url.netloc | |
request_url_percentage_value = request_url_percentage(soup, domain) | |
TTL, ip_address_count = dns_record(domain) | |
count_domain_occurrences_value = count_domain_occurrences(soup, domain) | |
except Exception as e: | |
print('urlparse error, double check your code: ' + str(e)) | |
continue | |
try: | |
w = whois.whois(domain) | |
domain_registeration_length_value = domain_registeration_length(w) | |
abnormal_url_value = abnormal_url(url, w) | |
age_of_domain_value = age_of_domain(w) | |
except Exception as e: | |
print('whois error: ' + str(e)) | |
domain_registeration_length_value = None | |
abnormal_url_value = None | |
age_of_domain_value = None | |
row = [issuer, certificate_age, request_url_percentage_value, script_percentage, link_percentage, TTL, ip_address_count, count_domain_occurrences_value, domain_registeration_length_value, abnormal_url_value, age_of_domain_value, page_rank_decimals[i], bert_confidences[i]] | |
df.loc[len(df)] = row | |
return df |