import os import dns.resolver, dns.rdatatype import requests from bs4 import BeautifulSoup from collections import Counter import whois from datetime import datetime import time import csv import ssl import socket from urllib.request import urlparse import OpenSSL.crypto import pandas as pd import random def generate_user_agent() -> str: a = random.randint(63, 89) b = random.randint(1, 3200) c = random.randint(1, 140) user_agent = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{a}.0.{b}.{c} Safari/537.3' return user_agent headers = { 'User-Agent': generate_user_agent() } def count_domain_occurrences(soup: BeautifulSoup, domain: str) -> int: """ Returns the number of occurrences of the domain in the website's page source. """ try: domain_count = soup.prettify().count(domain) return domain_count except Exception as e: print(f"count_domain_occurrences: {str(e)}") return 0 existing_isssuers = {"ACCVCA-120", "AT", "AU", "Apple Public Server ECC CA 12 - G1", "Apple Public Server RSA CA 12 - G1", "Atos TrustedRoot Server-CA 2019", "BE", "BM", "BR", "CH", "CN", "CZ", "DE", "ES", "FI", "FR", "GB", "GR", "HK", "HU", "IN", "IT", "JP", "LU", "LV", "MA", "NL", "NO", "PL", "PT", "RO", "SK", "TR", "TW", "US", "UY"} def get_certificate_info(url: str) -> tuple[str, int]: """ Returns the issuer and age of the certificate if found. None, None otherwise """ try: if not url.startswith("https://"): raise ValueError("URL must use HTTPS protocol") hostname = url.split("https://")[1].split("/")[0] ip_addresses = socket.getaddrinfo(hostname, 443) ip_address = ip_addresses[0][4][0] context = ssl.create_default_context() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssl_conn = context.wrap_socket(sock, server_hostname=hostname) ssl_conn.connect((ip_address, 443)) cert = ssl_conn.getpeercert() if 'notAfter' not in cert: raise ValueError("Certificate information not found") issuer = cert['issuer'][0][0][1] not_after = cert['notAfter'] not_after_date = datetime.strptime(not_after, '%b %d %H:%M:%S %Y %Z') certificate_age = (datetime.now() - not_after_date).days if issuer not in existing_isssuers: issuer = 'other' return issuer, certificate_age except Exception as e: print(f"get_certificate_info error: {str(e)}") return None, None def age_of_domain(w: whois.WhoisEntry) -> int: """ Returns the age of domain in days, None if error """ try: creation_date = w.creation_date if creation_date is None: # Domain creation date is not available, try using updated_date as a fallback updated_date = w.updated_date if updated_date is None: return -1 if type(updated_date) == list: creation_date = min(updated_date) else: creation_date = updated_date if type(creation_date) == list: creation_date = min(creation_date) num_days = (datetime.now() - creation_date).days return num_days except Exception as e: print('age_of_domain error: ' + str(e)) return None def abnormal_url(url: str, w: whois.WhoisEntry) -> int: """ Returns 1 if the hostname is not in the URL, 0 otherwise. """ host_name = w.domain.split('.')[0] if host_name not in url: return 1 else: return 0 def dns_record(domain: str) -> tuple[int, int, int]: """ Returns TTL, IP address count presence in a tuple of integers. Returns None, None, None if dns record not found. """ try: answers = dns.resolver.resolve(domain) TTL = answers.rrset.ttl IP_addresses = len(answers) return TTL, IP_addresses except dns.resolver.NXDOMAIN: return None, None except Exception as e: print(f"dns_record error: {str(e)}") return None, None def script_link_percentage(soup: BeautifulSoup) -> tuple[float, float]: """ Returns the percentage of meta, script, and link tags that have a link """ script_tags = soup.find_all('script') link_tags = soup.find_all('link') script_links = sum([1 for tag in script_tags if tag.has_attr('src')]) link_links = sum([1 for tag in link_tags if tag.has_attr('href')]) total_links = script_links + link_links if total_links == 0: return 0, 0, 0 script_percentage = (script_links / total_links) link_percentage = (link_links / total_links) return script_percentage, link_percentage def request_url_percentage(soup: BeautifulSoup, domain: str) -> float: """ Returns the percentage of external domains in the URL """ links = [link.get('href') for link in soup.find_all('a')] images = [img.get('src') for img in soup.find_all('img')] videos = [video.get('src') for video in soup.find_all('video')] sounds = [sound.get('src') for sound in soup.find_all('audio')] external_links = [] for link in links + images + videos + sounds: if link is None: continue parsed_domain = urlparse(link).netloc if parsed_domain != '' and parsed_domain != domain: external_links.append(link) external_domains = [urlparse(link).netloc for link in external_links] domain_counts = Counter(external_domains) total_links = len(external_domains) if total_links == 0: return 1 external_links_count = domain_counts[domain] return (external_links_count / total_links) def domain_registeration_length(w: whois.WhoisEntry) -> int: """" Returns the number of days since the domain was registered, None if error """ try: expiration_date = w.expiration_date if type(expiration_date) == list: expiration_date = expiration_date[0] if expiration_date is not None: time_to_expire = (expiration_date - datetime.now()).days return time_to_expire else: return 0 except Exception as e: print('domain_registeration_length error: ' + str(e)) return None def make_request(url: str, headers: dict, timeout: int, retries: int) -> requests.Response: for i in range(retries): try: response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True) response.raise_for_status() return response except requests.exceptions.RequestException as e: retry_delay = 2**i print(f'\033[34mRequestException for {url}: {e}. Retrying in {retry_delay} seconds...\033[0m') time.sleep(retry_delay) except Exception as e: print(f'\033[31mError making request for {url}: {e}\033[0m') return None print(f'\033[31mFailed to make request after {retries} retries.\033[0m') return None from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tokenizer = AutoTokenizer.from_pretrained("FredZhang7/malphish-eater-v1") model = AutoModelForSequenceClassification.from_pretrained("FredZhang7/malphish-eater-v1") model.eval() def predict_url_batch(urls: list[str]) -> list[float]: """Return the confidence of the urls being malicious.""" encodings = tokenizer.batch_encode_plus( urls, add_special_tokens=True, padding="max_length", truncation=True, max_length=208, return_tensors="pt" ) input_ids = encodings["input_ids"].to(device) attention_mask = encodings["attention_mask"].to(device) outputs = model(input_ids, attention_mask=attention_mask)[0] probs = torch.nn.functional.softmax(outputs, dim=1) confidences = probs[:, 1].tolist() return confidences url = os.environ['PAGE_RANK_URL'] api_key = os.environ['PAGE_RANK_KEY'] def page_ranks(urls: list[str]) -> list[float]: """ Returns the page ranks of the urls in a list of floats """ if not urls: return [] urls = [urlparse(url).netloc for url in urls] page_ranks = [] chunk_size = 100 for i in range(0, len(urls), chunk_size): start_time = time.time() params = { 'domains[]': urls[i:i+chunk_size] } headers = { 'API-OPR': api_key } response = requests.get(url, params=params, headers=headers) output = response.json() if response.status_code == 200: for obj in output['response']: if obj['status_code'] == 200: page_ranks.append(obj['page_rank_decimal']) else: page_ranks.append(None) else: print("\033[91m") print("Error: " + str(response.status_code)) print(response.text) print("\033[0m") print(f"Chunk {i//chunk_size + 1} of {len(urls)//chunk_size + 1} done in {time.time() - start_time} seconds.") return page_ranks def extract_features(urls: list[str]) -> pd.DataFrame: df = pd.DataFrame(columns=['issuer', 'certificate_age', 'request_url_percentage', 'script_percentage', 'link_percentage', 'TTL', 'ip_address_count', 'count_domain_occurrences', 'domain_registration_length', 'abnormal_url', 'age_of_domain', 'page_rank_decimal', 'bert_confidence']) bert_confidences = [] for i in range(0, len(urls), 64): batch_urls = urls[i:i+64] batch_confidences = predict_url_batch(batch_urls) bert_confidences.extend(batch_confidences) page_rank_decimals = page_ranks(urls) if len(page_rank_decimals) != len(urls): raise ValueError("Page ranks and urls must be the same length") for i in range(len(urls)): url = urls[i] response = make_request(url, headers, timeout=10, retries=2) if response is None: continue issuer, certificate_age = get_certificate_info(url) try: soup = BeautifulSoup(response.content, 'html.parser') script_percentage, link_percentage = script_link_percentage(soup) except Exception as e: print('soup error, double check your code: ' + str(e)) continue try: parsed_url = urlparse(url) domain = parsed_url.netloc request_url_percentage_value = request_url_percentage(soup, domain) TTL, ip_address_count = dns_record(domain) count_domain_occurrences_value = count_domain_occurrences(soup, domain) except Exception as e: print('urlparse error, double check your code: ' + str(e)) continue try: w = whois.whois(domain) domain_registeration_length_value = domain_registeration_length(w) abnormal_url_value = abnormal_url(url, w) age_of_domain_value = age_of_domain(w) except Exception as e: print('whois error: ' + str(e)) domain_registeration_length_value = None abnormal_url_value = None age_of_domain_value = None row = [issuer, certificate_age, request_url_percentage_value, script_percentage, link_percentage, TTL, ip_address_count, count_domain_occurrences_value, domain_registeration_length_value, abnormal_url_value, age_of_domain_value, page_rank_decimals[i], bert_confidences[i]] df.loc[len(df)] = row return df