# Import libraries import os import streamlit as st import pandas as pd import platform, uuid, psutil import requests import json from requests import get from geopy.geocoders import Nominatim from getmac import get_mac_address as gma from pathlib import Path from streamlit_extras.switch_page_button import switch_page from geopy.distance import geodesic as GD from datetime import datetime, timedelta import matplotlib # from face_verification_helper import face_verfication # Security #passlib,hashlib,bcrypt,scrypt import hashlib # DB Management import sqlite3 def make_hashes(password): return hashlib.sha256(str.encode(password)).hexdigest() def check_hashes(password, hashed_text): if make_hashes(password) == hashed_text: return hashed_text return False # DB Functions # Create table store username and password def create_user_table(): # Access database conn = sqlite3.connect('data.db') c = conn.cursor() c.execute('CREATE TABLE IF NOT EXISTS users(user_id INTEGER PRIMARY KEY AUTOINCREMENT,\ username TEXT NOT NULL, password TEXT NOT NULL)') c.close() def add_user_data(username, password): # Access database conn = sqlite3.connect('data.db') c = conn.cursor() c.execute('INSERT INTO users(username, password) VALUES (?,?)',(username,password)) conn.commit() c.close() def login_user(username, password): # Access database conn = sqlite3.connect('data.db') c = conn.cursor() c.execute('SELECT * FROM users WHERE username =? AND password = ?',(username,password)) data = c.fetchall() c.close() return data # def view_all_users(): # Access database # conn = sqlite3.connect('data.db') # c = conn.cursor() # c.execute('SELECT * FROM users') # data = c.fetchall() # c.close() # return data # Export data to CSV def export_csv(): # Access database conn = sqlite3.connect('data.db') # Export table login db_df = pd.read_sql_query('SELECT * FROM login', conn) db_df.to_csv('login.csv', index=False) # Export table users db_df = pd.read_sql_query('SELECT * FROM users', conn) db_df.to_csv('users.csv', index=False) # Create table to store login data def create_login_table(): # c.execute('DROP TABLE login') # Access database conn = sqlite3.connect('data.db') c = conn.cursor() c.execute('CREATE TABLE IF NOT EXISTS login(login_id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL,\ login_time TEXT NOT NULL,\ device_name TEXT, device_uuid TEXT, mac_address TEXT, device_vendor TEXT, device_model TEXT, device_ram TEXT,\ ip_v4 TEXT, ip_country TEXT, ip_region TEXT, ip_city TEXT, ip_lat TEXT, ip_lon TEXT, isp_name TEXT, isp_org TEXT,\ is_vpn TEXT, is_proxy TEXT, is_tor TEXT, is_relay TEXT,\ lat TEXT, lon TEXT, suburb TEXT, district TEXT, city TEXT, country TEXT)') c.close() # Add login data to database def add_login_data(user_dict): # Create login table if not existed create_login_table() # Get data from user dictionary username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country= get_from_user_dict(user_dict) # Access database conn = sqlite3.connect('data.db') c = conn.cursor() # Create table to store login information if not existed c.execute('INSERT INTO login(username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country)\ VALUES (?, ?,\ ?, ?, ?, ?, ?, ?,\ ?, ?, ?, ?, ?, ?, ?, ?,\ ?, ?, ?, ?,\ ?, ?, ?, ?, ?, ?)',\ (username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country)) conn.commit() c.close() # Get login data of the user def get_login_data(username): # Create login table if not existed create_login_table() # Access database conn = sqlite3.connect('data.db') c = conn.cursor() c.execute('SELECT * FROM login WHERE username = ?',(username,)) records = c.fetchall() c.close() return records # Get data from user dictionary def get_from_user_dict(user_dict): username = user_dict.get('username', '') login_time = user_dict.get('login_time', '') # typing_speed = user_dict.get('typing_speed', '') device_name = user_dict.get('device_name', '') device_uuid = user_dict.get('device_uuid', '') mac_address = user_dict.get('mac_address', '') device_vendor = user_dict.get('device_vendor', '') device_model = user_dict.get('device_model', '') device_ram = user_dict.get('device_ram', '') ip_v4 = user_dict.get('ip_v4', '') ip_country = user_dict.get('ip_country', '') ip_region = user_dict.get('ip_region', '') ip_city = user_dict.get('ip_city', '') ip_lat = user_dict.get('ip_lat', '') ip_lon = user_dict.get('ip_lon', '') isp_name = user_dict.get('isp_name', '') isp_org = user_dict.get('isp_org', '') is_vpn = user_dict.get('is_vpn', '') is_proxy = user_dict.get('is_proxy', '') is_tor = user_dict.get('is_tor', '') is_relay = user_dict.get('is_relay', '') lat = user_dict.get('lat', '') lon = user_dict.get('lon', '') suburb = user_dict.get('suburb', '') district = user_dict.get('district', '') city = user_dict.get('city', '') country = user_dict.get('country', '') return username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country def get_from_api(url, value=""): # Use get method to fetch details from URL API response = get(url + value) if response.status_code != 200: raise Exception("[!] Invalid request!") return response.content.decode() def get_ip_info(ip_v4): # Get information from the ipv4 isp = get_from_api("http://ip-api.com/json/", ip_v4) # Convert dictionary string to dictionary isp = json.loads(isp) # Get information from the dictionary ip_country = isp["country"] ip_region = isp["regionName"] ip_city = isp["city"] ip_lat = isp["lat"] ip_lon = isp["lon"] isp_name = isp["isp"] isp_org = isp["org"] # Detect VPN / proxy / tor vpn_api_key = st.secrets["vpn_api_key"] response = requests.get("https://vpnapi.io/api/" + ip_v4 + "?key=" + vpn_api_key) data = json.loads(response.text) is_vpn = data["security"]['vpn'] is_proxy = data["security"]['proxy'] is_tor = data["security"]['tor'] is_relay = data["security"]['relay'] return ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay def get_location(lat, lon): suburb = '' district = '' city = '' country = '' # Get address from given coordinate geolocator = Nominatim(user_agent="BAAM") location = geolocator.reverse(lat + "," + lon) address = location.raw['address'] suburb = address.get('suburb', '') if address.get('city_district', ''): district = address.get('city_district', '') else: district = address.get('district', '') city = address.get('city', '') country = address.get('country', '') return location, suburb, district, city, country # def collect_data(username, result, login_time, typing_speed): def collect_data(username, result, login_time): lat = '' lon = '' suburb = '' district = '' city = '' country = '' if "GET_LOCATION" in result: lat = str(result.get("GET_LOCATION")["lat"]) lon = str(result.get("GET_LOCATION")["lon"]) if lat and lon: location, suburb, district, city, country = get_location(lat, lon) # Collect device information device_name = platform.node() device_uuid = uuid.getnode() mac_address = gma() device_vendor = get_from_api("https://api.macvendors.com/", mac_address) device_model = platform.platform() device_ram = str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB" # Collect IP information ip_v4 = get_from_api('https://api.ipify.org') ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay = get_ip_info(ip_v4) user_dict = { "username": username, "login_time": login_time, # "typing_speed": typing_speed, "device_name": device_name, "device_uuid": device_uuid, "mac_address": mac_address, "device_vendor": device_vendor, "device_model": device_model, "device_ram": device_ram, "ip_v4": ip_v4, "ip_country": ip_country, "ip_region": ip_region, "ip_city": ip_city, "ip_lat": ip_lat, "ip_lon": ip_lon, "isp_name": isp_name, "isp_org": isp_org, "is_vpn": is_vpn, "is_proxy": is_proxy, "is_tor": is_tor, "is_relay": is_relay, "lat": lat, "lon": lon, "suburb": suburb, "district": district, "city": city, "country": country } return user_dict, str(location) # Retrieve login history of the user def get_login_history(username): login_time_history = [] # typing_speed_history = [] device_name_history = [] device_uuid_history = [] mac_address_history = [] device_vendor_history = [] device_model_history = [] device_ram_history = [] ip_v4_history = [] ip_country_history = [] ip_region_history = [] ip_city_history = [] ip_lat_history = [] ip_lon_history = [] isp_name_history = [] isp_org_history = [] is_vpn_history = [] is_proxy_history = [] is_tor_history = [] is_relay_history = [] lat_history = [] lon_history = [] suburb_history = [] district_history = [] city_history = [] country_history = [] login_data = get_login_data(username) if login_data: for row in login_data: login_time_history.append(row[2]) # typing_speed_history.append(row[3]) device_name_history.append(row[3]) device_uuid_history.append(row[4]) mac_address_history.append(row[5]) device_vendor_history.append(row[6]) device_model_history.append(row[7]) device_ram_history.append(row[8]) ip_v4_history.append(row[9]) ip_country_history.append(row[10]) ip_region_history.append(row[11]) ip_city_history.append(row[12]) ip_lat_history.append(row[13]) ip_lon_history.append(row[14]) isp_name_history.append(row[15]) isp_org_history.append(row[16]) is_vpn_history.append(row[17]) is_proxy_history.append(row[18]) is_tor_history.append(row[19]) is_relay_history.append(row[20]) lat_history.append(row[21]) lon_history.append(row[22]) suburb_history.append(row[23]) district_history.append(row[24]) city_history.append(row[25]) country_history.append(row[26]) return login_time_history,\ device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\ ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\ is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\ lat_history, lon_history, suburb_history, district_history, city_history, country_history def submit_test_case(user_dict, location): submit_button = st.button("Start test case") # When clicking submit button if submit_button: # Call function to verify test case with historical data verification = verify_user(user_dict) # If fail the user verification logic # if not(verification): # # Check face verification # verification = verify_face(user_dict.get('username', '')) # Update location, user_dict to pass to other pages st.session_state['location'] = location st.session_state['user_dict'] = user_dict st.session_state['verification'] = verification # If passed all verification logic if verification: # Open Sent Page switch_page("TestPass") else: # Open Failed Page switch_page("InputImage") def show_test_data(user_dict, location): # Current information username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) # Show location st.write('Location:', location) col1, col2 = st.columns(2) with col1: # Show IP IP info st.write('IP address:', ip_v4) st.write('IP region:', ip_region) st.write('IP city:', ip_city) st.write('IP country:', ip_country) st.write('Is VPN?', is_vpn) st.write('Is Proxy?', is_proxy) st.write('Is Tor Node?', is_tor) st.write('Is Relay?', is_relay) with col2: # Show Device st.write('ISP Name:', isp_name) st.write('ISP Organisation:', isp_org) st.write('Device Mac Address:', mac_address) st.write('Device UUID:', device_uuid) st.write('Device Name:', device_name) st.write('Device Vendor:', device_vendor) st.write('Device Model:', device_model) st.write('Device Ram:', device_ram) # Show Login time st.write('Login time:', login_time) def save_user_image(username, image, input_time): save_dir = f"img/user_image/{username}" save_file_path = f"img/user_image/{username}/{username}_" \ f"{int(input_time)}.jpg" if not os.path.exists(save_dir): os.makedirs(save_dir) with open(save_file_path, mode='wb') as w: w.write(image.getbuffer()) return save_file_path def read_user_image(username): list_image_path = [] image_dir = f"img/user_image/{username}" if not os.path.exists(image_dir): os.makedirs(image_dir) for x in os.listdir(image_dir): if x.split(".")[1].lower() in ("jpg", "png", "jpeg"): image_path = os.path.join(image_dir, x) list_image_path.append(image_path) if list_image_path: return max(list_image_path, key=os.path.getctime) else: return None # @Thao: Here is to put user historical data verification logic to determine if this is the real user # THAO LE CODE # 0.5 to get the more suitable per # get all the values with count in range ~20% less than the highest def get_right_per(user_dict,user_db,match_value,per): if match_value == "": return 0 range_highest = 0.2 count_all = user_db.groupby(match_value).count().login_time up = count_all.max() down = up*(1-range_highest) # print(f'range is from {down} to {up}') if count_all[user_dict[match_value]] >= down and count_all[user_dict[match_value]] <= up: return 1 else: return per # 1.1 to get the match value and percentage def check_match_per(user_dict,user_db,check = 'location'): """ input the check is one of 'location','device','ip' """ match_value = '' final_per = 0 per = 0 total_txn = len(user_db) if check == 'location': fields_check = ['country','city', 'district','suburb'] elif check == 'device': fields_check = ['device_vendor','device_model','device_name','mac_address','device_uuid'] else: fields_check = [ 'ip_country', 'isp_name','ip_v4'] for i in fields_check: # print(user_db) # print(user_dict[i]) count = len(user_db[user_db[i] == user_dict[i]]) if count > 0: # if user[i] in user_db[i].values and user[i] != '': match_value = i per = count/total_txn elif i == 'mac_address' and match_value != i: continue else: break final_per = get_right_per(user_dict,user_db,match_value,per) # print('match value ',{match_value}) return match_value,final_per # 2.1 Get velocity of all transactions def get_vel_all(row): # print(row) dist = 0 coor = (row['lat'],row['lon']) coor_pre = (row['pre_lat'],row['pre_lon']) interval = '' vel = '' if coor_pre != (0,0): dist = GD(coor,coor_pre).km interval = (row['login_time'] - row['pre_time']).total_seconds()/(60*60) # interval = (row['login_time'] - row['pre_time']).days if interval != 0: vel = dist/interval else: vel = 0 return vel # 2.2 get vel of the latest txn def get_vel_txn(user_dict,user_db): """ get the velocity of the new transaction in user_dict and the latest transaction in user's history """ dist = 0 interval = '' latest_txn = user_db.iloc[[-1]] # print(latest_txn) coor_txn = (user_dict['lat'],user_dict['lon']) coor_latest = (float(latest_txn['lat']),float(latest_txn['lon'])) dist = GD(coor_txn,coor_latest).km print(type(user_dict['login_time'])) try: time_txn = user_dict['login_time'] interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) except: time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S') interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) # if type(user_dict['login_time']) == 'str': # time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S') # else: # time_txn = user_dict['login_time'] # print(latest_txn['login_time']) # interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) try: vel = dist/interval except: vel = 0 print(f'This is vel {vel}') return(vel) # 2 to get score will be reduced because of jumping def get_score_jump (user_dict,user_db): threshold_vel = { 'H':600, 'M':80, 'L':40, 'frequency':0.1 } weight_vel = { 'H':20, 'M':10, "L":5 } many_vel = { 'Y':0.1, 'N':1 } # to get how many jumping - low or not count_jump = len(user_db[user_db.apply(lambda x: float(x.vel) > threshold_vel['L'] if x.vel != "" else False,axis=1)]) if count_jump > len(user_db)*threshold_vel['frequency']: many_jump = 'Y' else: many_jump = 'N' vel_txn = float(get_vel_txn(user_dict,user_db)) if vel_txn > threshold_vel['H']: score_jump = many_vel[many_jump] * weight_vel['H'] elif vel_txn > threshold_vel['M']: score_jump = many_vel[many_jump] * weight_vel['M'] elif vel_txn > threshold_vel['L']: score_jump = many_vel[many_jump] * weight_vel['L'] else: score_jump = 0 return(score_jump) # 3.1 check vpn (new IP + vpn) def get_vpn_score (user_dict,user_db): vpn_fields = ['is_vpn','is_proxy', 'is_tor','is_relay'] weight_vpn = 10 vpn_count = 0 for i in vpn_fields: vpn_count += user_dict[i] if check_match_per(user_dict,user_db,check = 'ip')[0] != 'ip_v4' and vpn_count > 0: return weight_vpn else: return 0 # 4.Get score def get_risk_score (user_dict,user_db): weight = {'device_uuid': 40, 'mac_address': 40, 'device_name': 30.0, 'device_model': 20.0, \ 'device_vendor': 4.0, 'ip_v4': 30, 'isp_name': 15.0, 'ip_country': 3.0, 'suburb': 30, 'district': 22.5, \ 'city': 15.0, 'country': 3.0} device_match,device_per = check_match_per(user_dict,user_db,check='device') if device_match != '': device_score = weight[device_match] * device_per else: device_score = 0 ip_match,ip_per = check_match_per(user_dict,user_db,check='ip') if ip_match != '': ip_score = weight[ip_match] * ip_per else: ip_score = 0 # check location location_match,location_per = check_match_per(user_dict,user_db,check='location') if location_match != '': location_score = weight[location_match] * location_per else: location_score = 0 # print(f'match location {location_match} with score {location_score}') jump_score = get_score_jump (user_dict,user_db) vpn_score = get_vpn_score (user_dict,user_db) print(f'device score {device_score}') print(f'ip_score {ip_score}') print(f'location_score {location_score}') print(f'jump_score {jump_score}') print(f'vpn_score {vpn_score}') # return device_score+ip_score+location_score-(jump_score + vpn_score) total_score = device_score+ip_score+location_score-(jump_score + vpn_score) score_dict = { "device_score": device_score, "ip_score": ip_score, "location_score": location_score, "jump_score": jump_score, "vpn_score": vpn_score, "total_score": total_score } st.session_state['score_dict'] = score_dict return total_score # User verification def verify_user(user_dict): verification = False # Current information # user_dict is dictionary username, login_time,\ device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ is_vpn, is_proxy, is_tor, is_relay,\ lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) # Retrieve login history of the user - a tuple # login_time_history, \ # device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\ # ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\ # is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\ # lat_history, lon_history, suburb_history, district_history, city_history, country_history = get_login_history(username) #Thao Le note: col = ['login_time','device_name', 'device_uuid','mac_address', 'device_vendor', 'device_model', 'device_ram',\ 'ip_v4','ip_country', 'ip_region', 'ip_city', 'ip_lat', 'ip_lon', 'isp_name','isp_org',\ 'is_vpn', 'is_proxy', 'is_tor', 'is_relay', \ 'lat', 'lon','suburb', 'district', 'city', 'country'] df = get_login_history(username) user_db = pd.DataFrame(get_login_history(username)).T user_db.columns= col # print(f'this is from BAAM function, username is {username}') # print('this is user_dict',user_dict) if len(user_db) == 0: print('This is the 1st login time of this username') verification = True score_dict = {} st.session_state['score_dict'] = score_dict return verification # print(user_db) # 2. Def to check score_jumping: velocity (H: 600+, M: 80 - 600, S: 40<80) and frequency_jumping (rare or ussually) user_db.login_time = pd.to_datetime(user_db.login_time) user_db['pre_lat'] = user_db['lat'].shift(periods=1, fill_value=0) user_db['pre_lon'] = user_db['lon'].shift(periods=1, fill_value=0) user_db['pre_time'] = user_db['login_time'].shift(periods=1,fill_value=0) user_db['vel'] = user_db.apply(lambda x: get_vel_all(x),axis=1) trust_score =get_risk_score (user_dict,user_db) risk_threshold = 30 st.session_state['risk_threshold'] = risk_threshold print(f'trust_score is {trust_score}') if trust_score < risk_threshold: verification = False else: verification = True print(f'verification {verification}') # verification = True # This should be removed after @Thao adds the function user historical data verification return verification # @Dora: This is to put the function face verification # Face verification def verify_face(username, img_file_buffer): face_verification = True # input_time = datetime.now().timestamp() # latest_history_image = read_user_image(username) # if latest_history_image: # image_path = save_user_image(username, img_file_buffer, input_time) # face_verification = face_verfication([[latest_history_image,image_path]]) # os.remove(image_path) # else: # face_verification = True # if face_verification: # image_path = save_user_image(username, img_file_buffer, input_time) # print(f"verify face:{face_verification}") return face_verification