|
|
|
import os |
|
import streamlit as st |
|
import pandas as pd |
|
import platform, uuid, psutil |
|
import requests |
|
import json |
|
from requests import get |
|
from geopy.geocoders import Nominatim |
|
from getmac import get_mac_address as gma |
|
from pathlib import Path |
|
from streamlit_extras.switch_page_button import switch_page |
|
|
|
from geopy.distance import geodesic as GD |
|
from datetime import datetime, timedelta |
|
import matplotlib |
|
|
|
|
|
|
|
|
|
import hashlib |
|
|
|
|
|
import sqlite3 |
|
|
|
def make_hashes(password): |
|
return hashlib.sha256(str.encode(password)).hexdigest() |
|
|
|
def check_hashes(password, hashed_text): |
|
if make_hashes(password) == hashed_text: |
|
return hashed_text |
|
return False |
|
|
|
|
|
|
|
def create_user_table(): |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
c.execute('CREATE TABLE IF NOT EXISTS users(user_id INTEGER PRIMARY KEY AUTOINCREMENT,\ |
|
username TEXT NOT NULL, password TEXT NOT NULL)') |
|
c.close() |
|
|
|
def add_user_data(username, password): |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
c.execute('INSERT INTO users(username, password) VALUES (?,?)',(username,password)) |
|
conn.commit() |
|
c.close() |
|
|
|
def login_user(username, password): |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
c.execute('SELECT * FROM users WHERE username =? AND password = ?',(username,password)) |
|
data = c.fetchall() |
|
c.close() |
|
return data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_csv(): |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
|
|
|
|
db_df = pd.read_sql_query('SELECT * FROM login', conn) |
|
db_df.to_csv('login.csv', index=False) |
|
|
|
|
|
db_df = pd.read_sql_query('SELECT * FROM users', conn) |
|
db_df.to_csv('users.csv', index=False) |
|
|
|
|
|
def create_login_table(): |
|
|
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
|
|
c.execute('CREATE TABLE IF NOT EXISTS login(login_id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL,\ |
|
login_time TEXT NOT NULL,\ |
|
device_name TEXT, device_uuid TEXT, mac_address TEXT, device_vendor TEXT, device_model TEXT, device_ram TEXT,\ |
|
ip_v4 TEXT, ip_country TEXT, ip_region TEXT, ip_city TEXT, ip_lat TEXT, ip_lon TEXT, isp_name TEXT, isp_org TEXT,\ |
|
is_vpn TEXT, is_proxy TEXT, is_tor TEXT, is_relay TEXT,\ |
|
lat TEXT, lon TEXT, suburb TEXT, district TEXT, city TEXT, country TEXT)') |
|
|
|
c.close() |
|
|
|
|
|
def add_login_data(user_dict): |
|
|
|
|
|
create_login_table() |
|
|
|
|
|
username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country= get_from_user_dict(user_dict) |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
|
|
|
|
c.execute('INSERT INTO login(username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country)\ |
|
VALUES (?, ?,\ |
|
?, ?, ?, ?, ?, ?,\ |
|
?, ?, ?, ?, ?, ?, ?, ?,\ |
|
?, ?, ?, ?,\ |
|
?, ?, ?, ?, ?, ?)',\ |
|
(username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country)) |
|
|
|
conn.commit() |
|
c.close() |
|
|
|
|
|
def get_login_data(username): |
|
|
|
|
|
create_login_table() |
|
|
|
|
|
conn = sqlite3.connect('data.db') |
|
c = conn.cursor() |
|
|
|
c.execute('SELECT * FROM login WHERE username = ?',(username,)) |
|
records = c.fetchall() |
|
c.close() |
|
|
|
return records |
|
|
|
|
|
def get_from_user_dict(user_dict): |
|
|
|
username = user_dict.get('username', '') |
|
login_time = user_dict.get('login_time', '') |
|
|
|
device_name = user_dict.get('device_name', '') |
|
device_uuid = user_dict.get('device_uuid', '') |
|
mac_address = user_dict.get('mac_address', '') |
|
device_vendor = user_dict.get('device_vendor', '') |
|
device_model = user_dict.get('device_model', '') |
|
device_ram = user_dict.get('device_ram', '') |
|
ip_v4 = user_dict.get('ip_v4', '') |
|
ip_country = user_dict.get('ip_country', '') |
|
ip_region = user_dict.get('ip_region', '') |
|
ip_city = user_dict.get('ip_city', '') |
|
ip_lat = user_dict.get('ip_lat', '') |
|
ip_lon = user_dict.get('ip_lon', '') |
|
isp_name = user_dict.get('isp_name', '') |
|
isp_org = user_dict.get('isp_org', '') |
|
is_vpn = user_dict.get('is_vpn', '') |
|
is_proxy = user_dict.get('is_proxy', '') |
|
is_tor = user_dict.get('is_tor', '') |
|
is_relay = user_dict.get('is_relay', '') |
|
lat = user_dict.get('lat', '') |
|
lon = user_dict.get('lon', '') |
|
suburb = user_dict.get('suburb', '') |
|
district = user_dict.get('district', '') |
|
city = user_dict.get('city', '') |
|
country = user_dict.get('country', '') |
|
|
|
|
|
return username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country |
|
|
|
def get_from_api(url, value=""): |
|
|
|
|
|
response = get(url + value) |
|
if response.status_code != 200: |
|
raise Exception("[!] Invalid request!") |
|
|
|
return response.content.decode() |
|
|
|
def get_ip_info(ip_v4): |
|
|
|
|
|
isp = get_from_api("http://ip-api.com/json/", ip_v4) |
|
|
|
|
|
isp = json.loads(isp) |
|
|
|
|
|
ip_country = isp["country"] |
|
ip_region = isp["regionName"] |
|
ip_city = isp["city"] |
|
ip_lat = isp["lat"] |
|
ip_lon = isp["lon"] |
|
isp_name = isp["isp"] |
|
isp_org = isp["org"] |
|
|
|
|
|
vpn_api_key = st.secrets["vpn_api_key"] |
|
response = requests.get("https://vpnapi.io/api/" + ip_v4 + "?key=" + vpn_api_key) |
|
data = json.loads(response.text) |
|
|
|
is_vpn = data["security"]['vpn'] |
|
is_proxy = data["security"]['proxy'] |
|
is_tor = data["security"]['tor'] |
|
is_relay = data["security"]['relay'] |
|
|
|
return ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay |
|
|
|
def get_location(lat, lon): |
|
|
|
suburb = '' |
|
district = '' |
|
city = '' |
|
country = '' |
|
|
|
|
|
geolocator = Nominatim(user_agent="BAAM") |
|
|
|
location = geolocator.reverse(lat + "," + lon) |
|
address = location.raw['address'] |
|
|
|
suburb = address.get('suburb', '') |
|
|
|
if address.get('city_district', ''): |
|
district = address.get('city_district', '') |
|
else: |
|
district = address.get('district', '') |
|
|
|
city = address.get('city', '') |
|
country = address.get('country', '') |
|
|
|
return location, suburb, district, city, country |
|
|
|
|
|
def collect_data(username, result, login_time): |
|
|
|
lat = '' |
|
lon = '' |
|
suburb = '' |
|
district = '' |
|
city = '' |
|
country = '' |
|
|
|
if "GET_LOCATION" in result: |
|
lat = str(result.get("GET_LOCATION")["lat"]) |
|
lon = str(result.get("GET_LOCATION")["lon"]) |
|
|
|
if lat and lon: |
|
location, suburb, district, city, country = get_location(lat, lon) |
|
|
|
|
|
device_name = platform.node() |
|
device_uuid = uuid.getnode() |
|
mac_address = gma() |
|
device_vendor = get_from_api("https://api.macvendors.com/", mac_address) |
|
device_model = platform.platform() |
|
device_ram = str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB" |
|
|
|
|
|
ip_v4 = get_from_api('https://api.ipify.org') |
|
ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org, is_vpn, is_proxy, is_tor, is_relay = get_ip_info(ip_v4) |
|
|
|
user_dict = { |
|
"username": username, |
|
"login_time": login_time, |
|
|
|
"device_name": device_name, |
|
"device_uuid": device_uuid, |
|
"mac_address": mac_address, |
|
"device_vendor": device_vendor, |
|
"device_model": device_model, |
|
"device_ram": device_ram, |
|
"ip_v4": ip_v4, |
|
"ip_country": ip_country, |
|
"ip_region": ip_region, |
|
"ip_city": ip_city, |
|
"ip_lat": ip_lat, |
|
"ip_lon": ip_lon, |
|
"isp_name": isp_name, |
|
"isp_org": isp_org, |
|
"is_vpn": is_vpn, |
|
"is_proxy": is_proxy, |
|
"is_tor": is_tor, |
|
"is_relay": is_relay, |
|
"lat": lat, |
|
"lon": lon, |
|
"suburb": suburb, |
|
"district": district, |
|
"city": city, |
|
"country": country |
|
} |
|
|
|
return user_dict, str(location) |
|
|
|
|
|
def get_login_history(username): |
|
|
|
login_time_history = [] |
|
|
|
device_name_history = [] |
|
device_uuid_history = [] |
|
mac_address_history = [] |
|
device_vendor_history = [] |
|
device_model_history = [] |
|
device_ram_history = [] |
|
ip_v4_history = [] |
|
ip_country_history = [] |
|
ip_region_history = [] |
|
ip_city_history = [] |
|
ip_lat_history = [] |
|
ip_lon_history = [] |
|
isp_name_history = [] |
|
isp_org_history = [] |
|
is_vpn_history = [] |
|
is_proxy_history = [] |
|
is_tor_history = [] |
|
is_relay_history = [] |
|
lat_history = [] |
|
lon_history = [] |
|
suburb_history = [] |
|
district_history = [] |
|
city_history = [] |
|
country_history = [] |
|
|
|
login_data = get_login_data(username) |
|
|
|
if login_data: |
|
|
|
for row in login_data: |
|
login_time_history.append(row[2]) |
|
|
|
device_name_history.append(row[3]) |
|
device_uuid_history.append(row[4]) |
|
mac_address_history.append(row[5]) |
|
device_vendor_history.append(row[6]) |
|
device_model_history.append(row[7]) |
|
device_ram_history.append(row[8]) |
|
ip_v4_history.append(row[9]) |
|
ip_country_history.append(row[10]) |
|
ip_region_history.append(row[11]) |
|
ip_city_history.append(row[12]) |
|
ip_lat_history.append(row[13]) |
|
ip_lon_history.append(row[14]) |
|
isp_name_history.append(row[15]) |
|
isp_org_history.append(row[16]) |
|
is_vpn_history.append(row[17]) |
|
is_proxy_history.append(row[18]) |
|
is_tor_history.append(row[19]) |
|
is_relay_history.append(row[20]) |
|
lat_history.append(row[21]) |
|
lon_history.append(row[22]) |
|
suburb_history.append(row[23]) |
|
district_history.append(row[24]) |
|
city_history.append(row[25]) |
|
country_history.append(row[26]) |
|
|
|
return login_time_history,\ |
|
device_name_history, device_uuid_history, mac_address_history, device_vendor_history, device_model_history, device_ram_history,\ |
|
ip_v4_history, ip_country_history, ip_region_history, ip_city_history, ip_lat_history, ip_lon_history, isp_name_history, isp_org_history,\ |
|
is_vpn_history, is_proxy_history, is_tor_history, is_relay_history,\ |
|
lat_history, lon_history, suburb_history, district_history, city_history, country_history |
|
|
|
def submit_test_case(user_dict, location): |
|
|
|
submit_button = st.button("Start test case") |
|
|
|
|
|
if submit_button: |
|
|
|
|
|
verification = verify_user(user_dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.session_state['location'] = location |
|
st.session_state['user_dict'] = user_dict |
|
st.session_state['verification'] = verification |
|
|
|
|
|
if verification: |
|
|
|
|
|
switch_page("TestPass") |
|
|
|
else: |
|
|
|
|
|
switch_page("InputImage") |
|
|
|
def show_test_data(user_dict, location): |
|
|
|
|
|
username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) |
|
|
|
|
|
st.write('Location:', location) |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
|
|
|
|
st.write('IP address:', ip_v4) |
|
st.write('IP region:', ip_region) |
|
st.write('IP city:', ip_city) |
|
st.write('IP country:', ip_country) |
|
st.write('Is VPN?', is_vpn) |
|
st.write('Is Proxy?', is_proxy) |
|
st.write('Is Tor Node?', is_tor) |
|
st.write('Is Relay?', is_relay) |
|
|
|
with col2: |
|
|
|
|
|
st.write('ISP Name:', isp_name) |
|
st.write('ISP Organisation:', isp_org) |
|
st.write('Device Mac Address:', mac_address) |
|
st.write('Device UUID:', device_uuid) |
|
st.write('Device Name:', device_name) |
|
st.write('Device Vendor:', device_vendor) |
|
st.write('Device Model:', device_model) |
|
st.write('Device Ram:', device_ram) |
|
|
|
|
|
st.write('Login time:', login_time) |
|
|
|
def save_user_image(username, image, input_time): |
|
save_dir = f"img/user_image/{username}" |
|
save_file_path = f"img/user_image/{username}/{username}_" \ |
|
f"{int(input_time)}.jpg" |
|
if not os.path.exists(save_dir): |
|
os.makedirs(save_dir) |
|
with open(save_file_path, mode='wb') as w: |
|
w.write(image.getbuffer()) |
|
return save_file_path |
|
|
|
def read_user_image(username): |
|
list_image_path = [] |
|
image_dir = f"img/user_image/{username}" |
|
if not os.path.exists(image_dir): |
|
os.makedirs(image_dir) |
|
for x in os.listdir(image_dir): |
|
if x.split(".")[1].lower() in ("jpg", "png", "jpeg"): |
|
image_path = os.path.join(image_dir, x) |
|
list_image_path.append(image_path) |
|
if list_image_path: |
|
return max(list_image_path, key=os.path.getctime) |
|
else: |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_right_per(user_dict,user_db,match_value,per): |
|
if match_value == "": |
|
return 0 |
|
range_highest = 0.2 |
|
count_all = user_db.groupby(match_value).count().login_time |
|
up = count_all.max() |
|
down = up*(1-range_highest) |
|
|
|
if count_all[user_dict[match_value]] >= down and count_all[user_dict[match_value]] <= up: |
|
return 1 |
|
else: |
|
return per |
|
|
|
|
|
def check_match_per(user_dict,user_db,check = 'location'): |
|
""" |
|
input the check is one of 'location','device','ip' |
|
""" |
|
match_value = '' |
|
final_per = 0 |
|
per = 0 |
|
total_txn = len(user_db) |
|
|
|
if check == 'location': |
|
fields_check = ['country','city', 'district','suburb'] |
|
elif check == 'device': |
|
fields_check = ['device_vendor','device_model','device_name','mac_address','device_uuid'] |
|
else: |
|
fields_check = [ 'ip_country', 'isp_name','ip_v4'] |
|
|
|
for i in fields_check: |
|
|
|
|
|
count = len(user_db[user_db[i] == user_dict[i]]) |
|
if count > 0: |
|
|
|
match_value = i |
|
per = count/total_txn |
|
|
|
elif i == 'mac_address' and match_value != i: |
|
continue |
|
else: |
|
break |
|
|
|
final_per = get_right_per(user_dict,user_db,match_value,per) |
|
|
|
return match_value,final_per |
|
|
|
|
|
def get_vel_all(row): |
|
|
|
dist = 0 |
|
coor = (row['lat'],row['lon']) |
|
coor_pre = (row['pre_lat'],row['pre_lon']) |
|
interval = '' |
|
vel = '' |
|
if coor_pre != (0,0): |
|
dist = GD(coor,coor_pre).km |
|
interval = (row['login_time'] - row['pre_time']).total_seconds()/(60*60) |
|
|
|
if interval != 0: |
|
vel = dist/interval |
|
else: |
|
vel = 0 |
|
return vel |
|
|
|
|
|
def get_vel_txn(user_dict,user_db): |
|
""" |
|
get the velocity of the new transaction in user_dict and the latest transaction in user's history |
|
""" |
|
dist = 0 |
|
interval = '' |
|
latest_txn = user_db.iloc[[-1]] |
|
|
|
coor_txn = (user_dict['lat'],user_dict['lon']) |
|
coor_latest = (float(latest_txn['lat']),float(latest_txn['lon'])) |
|
dist = GD(coor_txn,coor_latest).km |
|
|
|
print(type(user_dict['login_time'])) |
|
try: |
|
time_txn = user_dict['login_time'] |
|
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) |
|
except: |
|
time_txn = datetime.strptime(user_dict['login_time'],'%Y-%m-%d %H:%M:%S') |
|
interval = (time_txn - latest_txn['login_time'].to_list()[0]).total_seconds()/(60*60) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
vel = dist/interval |
|
except: |
|
vel = 0 |
|
print(f'This is vel {vel}') |
|
return(vel) |
|
|
|
|
|
def get_score_jump (user_dict,user_db): |
|
threshold_vel = { |
|
'H':600, |
|
'M':80, |
|
'L':40, |
|
'frequency':0.1 |
|
} |
|
weight_vel = { |
|
'H':20, |
|
'M':10, |
|
"L":5 |
|
} |
|
many_vel = { |
|
'Y':0.1, |
|
'N':1 |
|
} |
|
|
|
|
|
count_jump = len(user_db[user_db.apply(lambda x: float(x.vel) > threshold_vel['L'] if x.vel != "" else False,axis=1)]) |
|
|
|
if count_jump > len(user_db)*threshold_vel['frequency']: |
|
many_jump = 'Y' |
|
else: |
|
many_jump = 'N' |
|
|
|
vel_txn = float(get_vel_txn(user_dict,user_db)) |
|
|
|
if vel_txn > threshold_vel['H']: |
|
score_jump = many_vel[many_jump] * weight_vel['H'] |
|
elif vel_txn > threshold_vel['M']: |
|
score_jump = many_vel[many_jump] * weight_vel['M'] |
|
elif vel_txn > threshold_vel['L']: |
|
score_jump = many_vel[many_jump] * weight_vel['L'] |
|
else: |
|
score_jump = 0 |
|
return(score_jump) |
|
|
|
|
|
def get_vpn_score (user_dict,user_db): |
|
vpn_fields = ['is_vpn','is_proxy', 'is_tor','is_relay'] |
|
weight_vpn = 10 |
|
vpn_count = 0 |
|
for i in vpn_fields: |
|
vpn_count += user_dict[i] |
|
if check_match_per(user_dict,user_db,check = 'ip')[0] != 'ip_v4' and vpn_count > 0: |
|
return weight_vpn |
|
else: |
|
return 0 |
|
|
|
|
|
def get_risk_score (user_dict,user_db): |
|
weight = {'device_uuid': 40, 'mac_address': 40, 'device_name': 30.0, 'device_model': 20.0, \ |
|
'device_vendor': 4.0, 'ip_v4': 30, 'isp_name': 15.0, 'ip_country': 3.0, 'suburb': 30, 'district': 22.5, \ |
|
'city': 15.0, 'country': 3.0} |
|
device_match,device_per = check_match_per(user_dict,user_db,check='device') |
|
if device_match != '': |
|
device_score = weight[device_match] * device_per |
|
else: |
|
device_score = 0 |
|
|
|
ip_match,ip_per = check_match_per(user_dict,user_db,check='ip') |
|
if ip_match != '': |
|
ip_score = weight[ip_match] * ip_per |
|
else: |
|
ip_score = 0 |
|
|
|
|
|
location_match,location_per = check_match_per(user_dict,user_db,check='location') |
|
if location_match != '': |
|
location_score = weight[location_match] * location_per |
|
else: |
|
location_score = 0 |
|
|
|
jump_score = get_score_jump (user_dict,user_db) |
|
vpn_score = get_vpn_score (user_dict,user_db) |
|
|
|
print(f'device score {device_score}') |
|
print(f'ip_score {ip_score}') |
|
print(f'location_score {location_score}') |
|
print(f'jump_score {jump_score}') |
|
print(f'vpn_score {vpn_score}') |
|
|
|
|
|
total_score = device_score+ip_score+location_score-(jump_score + vpn_score) |
|
score_dict = { |
|
"device_score": device_score, |
|
"ip_score": ip_score, |
|
"location_score": location_score, |
|
"jump_score": jump_score, |
|
"vpn_score": vpn_score, |
|
"total_score": total_score |
|
} |
|
st.session_state['score_dict'] = score_dict |
|
|
|
return total_score |
|
|
|
|
|
def verify_user(user_dict): |
|
|
|
verification = False |
|
|
|
|
|
|
|
username, login_time,\ |
|
device_name, device_uuid, mac_address, device_vendor, device_model, device_ram,\ |
|
ip_v4, ip_country, ip_region, ip_city, ip_lat, ip_lon, isp_name, isp_org,\ |
|
is_vpn, is_proxy, is_tor, is_relay,\ |
|
lat, lon, suburb, district, city, country = get_from_user_dict(user_dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
col = ['login_time','device_name', 'device_uuid','mac_address', 'device_vendor', 'device_model', 'device_ram',\ |
|
'ip_v4','ip_country', 'ip_region', 'ip_city', 'ip_lat', 'ip_lon', 'isp_name','isp_org',\ |
|
'is_vpn', 'is_proxy', 'is_tor', 'is_relay', \ |
|
'lat', 'lon','suburb', 'district', 'city', 'country'] |
|
df = get_login_history(username) |
|
|
|
user_db = pd.DataFrame(get_login_history(username)).T |
|
user_db.columns= col |
|
|
|
|
|
if len(user_db) == 0: |
|
print('This is the 1st login time of this username') |
|
verification = True |
|
score_dict = {} |
|
st.session_state['score_dict'] = score_dict |
|
return verification |
|
|
|
|
|
|
|
user_db.login_time = pd.to_datetime(user_db.login_time) |
|
user_db['pre_lat'] = user_db['lat'].shift(periods=1, fill_value=0) |
|
user_db['pre_lon'] = user_db['lon'].shift(periods=1, fill_value=0) |
|
user_db['pre_time'] = user_db['login_time'].shift(periods=1,fill_value=0) |
|
user_db['vel'] = user_db.apply(lambda x: get_vel_all(x),axis=1) |
|
|
|
trust_score =get_risk_score (user_dict,user_db) |
|
risk_threshold = 30 |
|
st.session_state['risk_threshold'] = risk_threshold |
|
print(f'trust_score is {trust_score}') |
|
if trust_score < risk_threshold: |
|
verification = False |
|
else: |
|
verification = True |
|
print(f'verification {verification}') |
|
|
|
|
|
|
|
|
|
return verification |
|
|
|
|
|
|
|
def verify_face(username, img_file_buffer): |
|
face_verification = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return face_verification |