Linh Vuu
added files
c44d66d
raw history blame
No virus
23.2 kB
# Import libraries
import os
import streamlit as st
import pandas as pd
import platform, uuid, psutil
import requests
import json
from requests import get
from geopy.geocoders import Nominatim
from getmac import get_mac_address as gma
from pathlib import Path
from streamlit_extras.switch_page_button import switch_page
from geopy.distance import geodesic as GD
from datetime import datetime, timedelta
import matplotlib
# from face_verification_helper import face_verfication
# Security
#passlib,hashlib,bcrypt,scrypt
import hashlib
# DB Management
import sqlite3
def make_hashes(password):
return hashlib.sha256(str.encode(password)).hexdigest()
def check_hashes(password, hashed_text):
if make_hashes(password) == hashed_text:
return hashed_text
return False
# DB Functions
# Create table store username and password
def create_user_table():
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
c.execute('CREATE TABLE IF NOT EXISTS users(user_id INTEGER PRIMARY KEY AUTOINCREMENT,\
username TEXT NOT NULL, password TEXT NOT NULL)')
c.close()
def add_user_data(username, password):
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
c.execute('INSERT INTO users(username, password) VALUES (?,?)',(username,password))
conn.commit()
c.close()
def login_user(username, password):
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
c.execute('SELECT * FROM users WHERE username =? AND password = ?',(username,password))
data = c.fetchall()
c.close()
return data
# def view_all_users():
# Access database
# conn = sqlite3.connect('data.db')
# c = conn.cursor()
# c.execute('SELECT * FROM users')
# data = c.fetchall()
# c.close()
# return data
# Export data to CSV
def export_csv():
# Access database
conn = sqlite3.connect('data.db')
# Export table login
db_df = pd.read_sql_query('SELECT * FROM login', conn)
db_df.to_csv('login.csv', index=False)
# Export table users
db_df = pd.read_sql_query('SELECT * FROM users', conn)
db_df.to_csv('users.csv', index=False)
# Create table to store login data
def create_login_table():
# c.execute('DROP TABLE login')
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
c.execute('CREATE TABLE IF NOT EXISTS login(login_id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL,\
login_time TEXT NOT NULL,\
device_name TEXT, device_uuid TEXT, mac_address TEXT, device_vendor TEXT, device_model TEXT, device_ram TEXT,\
ip_v4 TEXT, ip_country TEXT, ip_region TEXT, ip_city TEXT, ip_lat TEXT, ip_lon TEXT, isp_name TEXT, isp_org TEXT,\
is_vpn TEXT, is_proxy TEXT, is_tor TEXT, is_relay TEXT,\
lat TEXT, lon TEXT, suburb TEXT, district TEXT, city TEXT, country TEXT)')
c.close()
# Add login data to database
def add_login_data(user_dict):
# Create login table if not existed
create_login_table()
# Get data from user dictionary
username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country= get_from_user_dict(user_dict)
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
# Create table to store login information if not existed
c.execute('INSERT INTO login(username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country)\
VALUES (?, ?,\
?, ?, ?, ?, ?, ?,\
?, ?, ?, ?, ?, ?, ?, ?,\
?, ?, ?, ?,\
?, ?, ?, ?, ?, ?)',\
(username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country))
conn.commit()
c.close()
# Get login data of the user
def get_login_data(username):
# Create login table if not existed
create_login_table()
# Access database
conn = sqlite3.connect('data.db')
c = conn.cursor()
c.execute('SELECT * FROM login WHERE username = ?',(username,))
records = c.fetchall()
c.close()
return records
# Get data from user dictionary
def get_from_user_dict(user_dict):
username = user_dict.get('username', '')
login_time = user_dict.get('login_time', '')
# typing_speed = user_dict.get('typing_speed', '')
device_name = user_dict.get('device_name', '')
device_uuid = user_dict.get('device_uuid', '')
mac_address = user_dict.get('mac_address', '')
device_vendor = user_dict.get('device_vendor', '')
device_model = user_dict.get('device_model', '')
device_ram = user_dict.get('device_ram', '')
ip_v4 = user_dict.get('ip_v4', '')
ip_country = user_dict.get('ip_country', '')
ip_region = user_dict.get('ip_region', '')
ip_city = user_dict.get('ip_city', '')
ip_lat = user_dict.get('ip_lat', '')
ip_lon = user_dict.get('ip_lon', '')
isp_name = user_dict.get('isp_name', '')
isp_org = user_dict.get('isp_org', '')
is_vpn = user_dict.get('is_vpn', '')
is_proxy = user_dict.get('is_proxy', '')
is_tor = user_dict.get('is_tor', '')
is_relay = user_dict.get('is_relay', '')
lat = user_dict.get('lat', '')
lon = user_dict.get('lon', '')
suburb = user_dict.get('suburb', '')
district = user_dict.get('district', '')
city = user_dict.get('city', '')
country = user_dict.get('country', '')
return username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country
def get_from_api(url, value=""):
# Use get method to fetch details from URL API
response = get(url + value)
if response.status_code != 200:
raise Exception("[!] Invalid request!")
return response.content.decode()
def get_ip_info(ip_v4):
# Get information from the ipv4
isp = get_from_api("http://ip-api.com/json/", ip_v4)
# Convert dictionary string to dictionary
isp = json.loads(isp)
# Get information from the dictionary
ip_country = isp["country"]
ip_region = isp["regionName"]
ip_city = isp["city"]
ip_lat = isp["lat"]
ip_lon = isp["lon"]
isp_name = isp["isp"]
isp_org = isp["org"]
# Detect VPN / proxy / tor
vpn_api_key = st.secrets["vpn_api_key"]
response = requests.get("https://vpnapi.io/api/" + ip_v4 + "?key=" + vpn_api_key)
data = json.loads(response.text)
is_vpn = data["security"]['vpn']
is_proxy = data["security"]['proxy']
is_tor = data["security"]['tor']
is_relay = data["security"]['relay']
return ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay
def get_location(lat, lon):
suburb = ''
district = ''
city = ''
country = ''
# Get address from given coordinate
geolocator = Nominatim(user_agent="BAAM")
location = geolocator.reverse(lat + "," + lon)
address = location.raw['address']
suburb = address.get('suburb', '')
if address.get('city_district', ''):
district = address.get('city_district', '')
else:
district = address.get('district', '')
city = address.get('city', '')
country = address.get('country', '')
return location, suburb, district, city, country
# def collect_data(username, result, login_time, typing_speed):
def collect_data(username, result, login_time):
lat = ''
lon = ''
suburb = ''
district = ''
city = ''
country = ''
if "GET_LOCATION" in result:
lat = str(result.get("GET_LOCATION")["lat"])
lon = str(result.get("GET_LOCATION")["lon"])
if lat and lon:
location, suburb, district, city, country = get_location(lat, lon)
# Collect device information
device_name = platform.node()
device_uuid = uuid.getnode()
mac_address = gma()
device_vendor = get_from_api("https://api.macvendors.com/", mac_address)
device_model = platform.platform()
device_ram = str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB"
# Collect IP information
ip_v4 = get_from_api('https://api.ipify.org')
ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay = get_ip_info(ip_v4)
user_dict = {
"username": username,
"login_time": login_time,
# "typing_speed": typing_speed,
"device_name": device_name,
"device_uuid": device_uuid,
"mac_address": mac_address,
"device_vendor": device_vendor,
"device_model": device_model,
"device_ram": device_ram,
"ip_v4": ip_v4,
"ip_country": ip_country,
"ip_region": ip_region,
"ip_city": ip_city,
"ip_lat": ip_lat,
"ip_lon": ip_lon,
"isp_name": isp_name,
"isp_org": isp_org,
"is_vpn": is_vpn,
"is_proxy": is_proxy,
"is_tor": is_tor,
"is_relay": is_relay,
"lat": lat,
"lon": lon,
"suburb": suburb,
"district": district,
"city": city,
"country": country
}
return user_dict, str(location)
# Retrieve login history of the user
def get_login_history(username):
login_time_history = []
# typing_speed_history = []
device_name_history = []
device_uuid_history = []
mac_address_history = []
device_vendor_history = []
device_model_history = []
device_ram_history = []
ip_v4_history = []
ip_country_history = []
ip_region_history = []
ip_city_history = []
ip_lat_history = []
ip_lon_history = []
isp_name_history = []
isp_org_history = []
is_vpn_history = []
is_proxy_history = []
is_tor_history = []
is_relay_history = []
lat_history = []
lon_history = []
suburb_history = []
district_history = []
city_history = []
country_history = []
login_data = get_login_data(username)
if login_data:
for row in login_data:
login_time_history.append(row[2])
# typing_speed_history.append(row[3])
device_name_history.append(row[3])
device_uuid_history.append(row[4])
mac_address_history.append(row[5])
device_vendor_history.append(row[6])
device_model_history.append(row[7])
device_ram_history.append(row[8])
ip_v4_history.append(row[9])
ip_country_history.append(row[10])
ip_region_history.append(row[11])
ip_city_history.append(row[12])
ip_lat_history.append(row[13])
ip_lon_history.append(row[14])
isp_name_history.append(row[15])
isp_org_history.append(row[16])
is_vpn_history.append(row[17])
is_proxy_history.append(row[18])
is_tor_history.append(row[19])
is_relay_history.append(row[20])
lat_history.append(row[21])
lon_history.append(row[22])
suburb_history.append(row[23])
district_history.append(row[24])
city_history.append(row[25])
country_history.append(row[26])
return login_time_history,\
device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\
ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\
is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\
lat_history, lon_history, suburb_history, district_history, city_history, country_history
def submit_test_case(user_dict, location):
submit_button = st.button("Start test case")
# When clicking submit button
if submit_button:
# Call function to verify test case with historical data
verification = verify_user(user_dict)
# If fail the user verification logic
# if not(verification):
# # Check face verification
# verification = verify_face(user_dict.get('username', ''))
# Update location, user_dict to pass to other pages
st.session_state['location'] = location
st.session_state['user_dict'] = user_dict
st.session_state['verification'] = verification
# If passed all verification logic
if verification:
# Open Sent Page
switch_page("TestPass")
else:
# Open Failed Page
switch_page("InputImage")
def show_test_data(user_dict, location):
# Current information
username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict)
# Show location
st.write('Location:', location)
col1, col2 = st.columns(2)
with col1:
# Show IP IP info
st.write('IP address:', ip_v4)
st.write('IP region:', ip_region)
st.write('IP city:', ip_city)
st.write('IP country:', ip_country)
st.write('Is VPN?', is_vpn)
st.write('Is Proxy?', is_proxy)
st.write('Is Tor Node?', is_tor)
st.write('Is Relay?', is_relay)
with col2:
# Show Device
st.write('ISP Name:', isp_name)
st.write('ISP Organisation:', isp_org)
st.write('Device Mac Address:', mac_address)
st.write('Device UUID:', device_uuid)
st.write('Device Name:', device_name)
st.write('Device Vendor:', device_vendor)
st.write('Device Model:', device_model)
st.write('Device Ram:', device_ram)
# Show Login time
st.write('Login time:', login_time)
def save_user_image(username, image, input_time):
save_dir = f"img/user_image/{username}"
save_file_path = f"img/user_image/{username}/{username}_" \
f"{int(input_time)}.jpg"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
with open(save_file_path, mode='wb') as w:
w.write(image.getbuffer())
return save_file_path
def read_user_image(username):
list_image_path = []
image_dir = f"img/user_image/{username}"
if not os.path.exists(image_dir):
os.makedirs(image_dir)
for x in os.listdir(image_dir):
if x.split(".")[1].lower() in ("jpg", "png", "jpeg"):
image_path = os.path.join(image_dir, x)
list_image_path.append(image_path)
if list_image_path:
return max(list_image_path, key=os.path.getctime)
else:
return None
# @Thao: Here is to put user historical data verification logic to determine if this is the real user
# THAO LE CODE
# 0.5 to get the more suitable per
# get all the values with count in range ~20% less than the highest
def get_right_per(user_dict,user_db,match_value,per):
if match_value == "":
return 0
range_highest = 0.2
count_all = user_db.groupby(match_value).count().login_time
up = count_all.max()
down = up*(1-range_highest)
# print(f'range is from {down} to {up}')
if count_all[user_dict[match_value]] >= down and count_all[user_dict[match_value]] <= up:
return 1
else:
return per
# 1.1 to get the match value and percentage
def check_match_per(user_dict,user_db,check = 'location'):
"""
input the check is one of 'location','device','ip'
"""
match_value = ''
final_per = 0
per = 0
total_txn = len(user_db)
if check == 'location':
fields_check = ['country','city', 'district','suburb']
elif check == 'device':
fields_check = ['device_vendor','device_model','device_name','mac_address','device_uuid']
else:
fields_check = [ 'ip_country', 'isp_name','ip_v4']
for i in fields_check:
# print(user_db)
# print(user_dict[i])
count = len(user_db[user_db[i] == user_dict[i]])
if count > 0:
# if user[i] in user_db[i].values and user[i] != '':
match_value = i
per = count/total_txn
elif i == 'mac_address' and match_value != i:
continue
else:
break
final_per = get_right_per(user_dict,user_db,match_value,per)
# print('match value ',{match_value})
return match_value,final_per
# 2.1 Get velocity of all transactions
def get_vel_all(row):
# print(row)
dist = 0
coor = (row['lat'],row['lon'])
coor_pre = (row['pre_lat'],row['pre_lon'])
interval = ''
vel = ''
if coor_pre != (0,0):
dist = GD(coor,coor_pre).km
interval = (row['login_time'] - row['pre_time']).total_seconds()/(60*60)
# interval = (row['login_time'] - row['pre_time']).days
if interval != 0:
vel = dist/interval
else:
vel = 0
return vel
# 2.2 get vel of the latest txn
def get_vel_txn(user_dict,user_db):
"""
get the velocity of the new transaction in user_dict and the latest transaction in user's history
"""
dist = 0
interval = ''
latest_txn = user_db.iloc[[-1]]
# print(latest_txn)
coor_txn = (user_dict['lat'],user_dict['lon'])
coor_latest = (float(latest_txn['lat']),float(latest_txn['lon']))
dist = GD(coor_txn,coor_latest).km
print(type(user_dict['login_time']))
try:
time_txn = user_dict['login_time']
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60)
except:
time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S')
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60)
# if type(user_dict['login_time']) == 'str':
# time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S')
# else:
# time_txn = user_dict['login_time']
# print(latest_txn['login_time'])
# interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60)
try:
vel = dist/interval
except:
vel = 0
print(f'This is vel {vel}')
return(vel)
# 2 to get score will be reduced because of jumping
def get_score_jump (user_dict,user_db):
threshold_vel = {
'H':600,
'M':80,
'L':40,
'frequency':0.1
}
weight_vel = {
'H':20,
'M':10,
"L":5
}
many_vel = {
'Y':0.1,
'N':1
}
# to get how many jumping - low or not
count_jump = len(user_db[user_db.apply(lambda x: float(x.vel) > threshold_vel['L'] if x.vel != "" else False,axis=1)])
if count_jump > len(user_db)*threshold_vel['frequency']:
many_jump = 'Y'
else:
many_jump = 'N'
vel_txn = float(get_vel_txn(user_dict,user_db))
if vel_txn > threshold_vel['H']:
score_jump = many_vel[many_jump] * weight_vel['H']
elif vel_txn > threshold_vel['M']:
score_jump = many_vel[many_jump] * weight_vel['M']
elif vel_txn > threshold_vel['L']:
score_jump = many_vel[many_jump] * weight_vel['L']
else:
score_jump = 0
return(score_jump)
# 3.1 check vpn (new IP + vpn)
def get_vpn_score (user_dict,user_db):
vpn_fields = ['is_vpn','is_proxy', 'is_tor','is_relay']
weight_vpn = 10
vpn_count = 0
for i in vpn_fields:
vpn_count += user_dict[i]
if check_match_per(user_dict,user_db,check = 'ip')[0] != 'ip_v4' and vpn_count > 0:
return weight_vpn
else:
return 0
# 4.Get score
def get_risk_score (user_dict,user_db):
weight = {'device_uuid': 40, 'mac_address': 40, 'device_name': 30.0, 'device_model': 20.0, \
'device_vendor': 4.0, 'ip_v4': 30, 'isp_name': 15.0, 'ip_country': 3.0, 'suburb': 30, 'district': 22.5, \
'city': 15.0, 'country': 3.0}
device_match,device_per = check_match_per(user_dict,user_db,check='device')
if device_match != '':
device_score = weight[device_match] * device_per
else:
device_score = 0
ip_match,ip_per = check_match_per(user_dict,user_db,check='ip')
if ip_match != '':
ip_score = weight[ip_match] * ip_per
else:
ip_score = 0
# check location
location_match,location_per = check_match_per(user_dict,user_db,check='location')
if location_match != '':
location_score = weight[location_match] * location_per
else:
location_score = 0
# print(f'match location {location_match} with score {location_score}')
jump_score = get_score_jump (user_dict,user_db)
vpn_score = get_vpn_score (user_dict,user_db)
print(f'device score {device_score}')
print(f'ip_score {ip_score}')
print(f'location_score {location_score}')
print(f'jump_score {jump_score}')
print(f'vpn_score {vpn_score}')
# return device_score+ip_score+location_score-(jump_score + vpn_score)
total_score = device_score+ip_score+location_score-(jump_score + vpn_score)
score_dict = {
"device_score": device_score,
"ip_score": ip_score,
"location_score": location_score,
"jump_score": jump_score,
"vpn_score": vpn_score,
"total_score": total_score
}
st.session_state['score_dict'] = score_dict
return total_score
# User verification
def verify_user(user_dict):
verification = False
# Current information
# user_dict is dictionary
username, login_time,\
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\
is_vpn, is_proxy, is_tor, is_relay,\
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict)
# Retrieve login history of the user - a tuple
# login_time_history, \
# device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\
# ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\
# is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\
# lat_history, lon_history, suburb_history, district_history, city_history, country_history = get_login_history(username)
#Thao Le note:
col = ['login_time','device_name', 'device_uuid','mac_address', 'device_vendor', 'device_model', 'device_ram',\
'ip_v4','ip_country', 'ip_region', 'ip_city', 'ip_lat', 'ip_lon', 'isp_name','isp_org',\
'is_vpn', 'is_proxy', 'is_tor', 'is_relay', \
'lat', 'lon','suburb', 'district', 'city', 'country']
df = get_login_history(username)
user_db = pd.DataFrame(get_login_history(username)).T
user_db.columns= col
# print(f'this is from BAAM function, username is {username}')
# print('this is user_dict',user_dict)
if len(user_db) == 0:
print('This is the 1st login time of this username')
verification = True
score_dict = {}
st.session_state['score_dict'] = score_dict
return verification
# print(user_db)
# 2. Def to check score_jumping: velocity (H: 600+, M: 80 - 600, S: 40<80) and frequency_jumping (rare or ussually)
user_db.login_time = pd.to_datetime(user_db.login_time)
user_db['pre_lat'] = user_db['lat'].shift(periods=1, fill_value=0)
user_db['pre_lon'] = user_db['lon'].shift(periods=1, fill_value=0)
user_db['pre_time'] = user_db['login_time'].shift(periods=1,fill_value=0)
user_db['vel'] = user_db.apply(lambda x: get_vel_all(x),axis=1)
trust_score =get_risk_score (user_dict,user_db)
risk_threshold = 30
st.session_state['risk_threshold'] = risk_threshold
print(f'trust_score is {trust_score}')
if trust_score < risk_threshold:
verification = False
else:
verification = True
print(f'verification {verification}')
# verification = True # This should be removed after @Thao adds the function user historical data verification
return verification
# @Dora: This is to put the function face verification
# Face verification
def verify_face(username, img_file_buffer):
face_verification = True
# input_time = datetime.now().timestamp()
# latest_history_image = read_user_image(username)
# if latest_history_image:
# image_path = save_user_image(username, img_file_buffer, input_time)
# face_verification = face_verfication([[latest_history_image,image_path]])
# os.remove(image_path)
# else:
# face_verification = True
# if face_verification:
# image_path = save_user_image(username, img_file_buffer, input_time)
# print(f"verify face:{face_verification}")
return face_verification