shaheerawan3's picture
Update views.py
f269140 verified
import pyotp
import time
import streamlit as st
from datetime import datetime
from uuid import uuid4
import logging
from typing import List, Dict, Optional
from utils.security import clear_security_metrics
import bcrypt
import re
from utils.security import (
validate_password,
check_rate_limit,
generate_jwt_token,
verify_jwt_token,
log_security_event
)
from utils.audit import audit_logger
from dashboard_components import dashboard_header, account_linking_form
from typing import Callable
def dashboard_header():
"""
Streamlit version of DashboardHeader.jsx
"""
col1, col2 = st.columns([2, 3])
with col1:
st.title("Digital Inheritance Dashboard")
st.markdown("<span style='color: gray; font-size: 0.8em'>by Muhammad Shaheer</span>",
unsafe_allow_html=True)
with col2:
cols = st.columns(3)
with cols[0]:
if st.button("🏦 Wallets", use_container_width=True):
st.session_state.page = 'wallets'
st.rerun()
with cols[1]:
if st.button("πŸ”— Link Accounts", use_container_width=True):
st.session_state.page = 'link_accounts'
st.rerun()
with cols[2]:
if st.button("πŸšͺ Logout", type="secondary", use_container_width=True):
logout_user()
def show_dashboard():
"""
Updated dashboard view incorporating React components
"""
# Show the header
dashboard_header()
# Get user accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Link Accounts' to get started.")
return
# Display accounts in a modern card layout
st.subheader("Your Digital Accounts")
for account in accounts:
with st.container():
cols = st.columns([1, 2, 1])
with cols[0]:
platform_icons = {
"facebook": "πŸ“˜",
"twitter": "🐦",
"instagram": "πŸ“Έ",
"linkedin": "πŸ’Ό"
}
icon = platform_icons.get(account['platform'].lower(), "πŸ”—")
st.markdown(f"### {icon} {account['platform']}")
st.write(f"**@{account['username']}**")
with cols[1]:
st.write(f"**Status:** {account['status']}")
st.write(f"**Last Activity:** {account['last_activity']}")
if account.get('heir_name'):
st.write(f"**Heir:** {account['heir_name']}")
with cols[2]:
if st.button("Remove Account", key=f"remove_{account['id']}",
type="secondary", use_container_width=True):
if remove_account(account['id']):
st.success("Account removed successfully!")
time.sleep(1)
st.rerun()
def logout_user():
"""Handle user logout."""
try:
# Log the logout event
if 'user' in st.session_state:
log_security_event(
st.session_state.user['id'],
'logout',
f'User {st.session_state.user["username"]} logged out'
)
# Clear metrics and session
clear_security_metrics() # Unregister Prometheus metrics
st.session_state.clear()
st.session_state.page = 'login'
st.rerun()
except Exception as e:
logger.error(f"Logout failed: {str(e)}")
st.error("An error occurred during logout. Please try again.")
def account_linking_form():
"""
Streamlit version of AccountLinking.jsx
"""
st.title("Link Social Media Account")
# Platform selection
platform = st.radio(
"Select Platform",
["Facebook", "Twitter", "Instagram", "LinkedIn"],
horizontal=True,
format_func=lambda x: {
"Facebook": "πŸ“˜ Facebook",
"Twitter": "🐦 Twitter",
"Instagram": "πŸ“Έ Instagram",
"LinkedIn": "πŸ’Ό LinkedIn"
}[x]
)
if platform:
username = st.text_input("Username", placeholder=f"Enter your {platform} username")
# Info message with custom styling
st.info(
"We'll monitor this account for inactivity. "
"Make sure the profile is public so we can track activity."
)
if username:
if st.button("Link Account", type="primary", use_container_width=True):
try:
# Add account linking logic here
success = link_social_account(platform.lower(), username)
if success:
st.success(f"Successfully linked {platform} account: {username}")
time.sleep(1)
st.session_state.page = 'dashboard'
st.rerun()
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
def apply_custom_css():
"""
Apply custom CSS to make the components look more like the original React design
"""
st.markdown("""
<style>
/* Custom styling for the header */
.stButton button {
border-radius: 8px;
padding: 0.5rem 1rem;
border: 1px solid #e5e7eb;
}
/* Style for radio buttons to look more like the original design */
.stRadio > label {
font-weight: 600;
margin-bottom: 1rem;
}
/* Container styling */
.element-container {
background-color: white;
padding: 1rem;
border-radius: 8px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
/* Input styling */
.stTextInput input {
border-radius: 8px;
border: 1px solid #e5e7eb;
padding: 0.5rem;
}
/* Info message styling */
.stAlert {
background-color: #EFF6FF;
color: #1D4ED8;
border-radius: 8px;
padding: 1rem;
}
</style>
""", unsafe_allow_html=True)
# Example usage in your main Streamlit app:
def main():
# Initialize session state
if 'page' not in st.session_state:
st.session_state.page = 'dashboard'
# Apply custom CSS
apply_custom_css()
def logout():
st.session_state.clear()
st.rerun()
# Show header
dashboard_header(logout)
# Show different pages based on state
if st.session_state.page == 'link_accounts':
account_linking_form()
# Add other pages as needed
# Configure logging
logger = logging.getLogger(__name__)
def show_dashboard():
"""Display the main dashboard."""
# Apply custom styling
apply_custom_css()
# Show dashboard header
dashboard_header(lambda: logout_user())
# Get and display accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Add Account' to get started.")
return
def logout_user():
st.session_state.clear()
st.rerun()
def verify_account(platform, username):
"""Verify if a social media account exists and is public."""
# Implementation needed
return True
def link_social_account(platform, username):
# Validate the account exists
account_exists = verify_account(platform, username)
if not account_exists:
st.error("Account not found or not public")
return False
# Store account info
try:
cursor = st.session_state.db.cursor()
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username,
last_check, last_activity,
created_at, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
current_time,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
return False
def link_social_account(platform, username):
# Validate the account exists
account_exists = verify_account(platform, username)
if not account_exists:
st.error("Account not found or not public")
return False
# Store account info
try:
cursor = st.session_state.db.cursor()
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username,
last_check, last_activity,
created_at, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
current_time,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
return False
def get_heir_info(heir_id: str) -> Optional[Dict]:
"""Get information about a specific heir."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, name, email, relationship
FROM heirs
WHERE id = ?
""", (heir_id,))
heir = cursor.fetchone()
if heir:
return {
'id': heir[0],
'name': heir[1],
'email': heir[2],
'relationship': heir[3]
}
return None
except Exception as e:
st.error(f"Failed to fetch heir info: {str(e)}")
return None
def get_heirs_list(user_id: str) -> List[Dict]:
"""Get list of heirs for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, name, email, relationship
FROM heirs
WHERE user_id = ?
""", (user_id,))
heirs = cursor.fetchall()
return [{'id': h[0], 'name': h[1], 'email': h[2], 'relationship': h[3]} for h in heirs]
except Exception as e:
st.error(f"Failed to fetch heirs: {str(e)}")
return []
def get_user_accounts(user_id: str) -> List[Dict]:
"""Get list of social accounts for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, platform, username, status, last_activity, heir_id
FROM social_accounts
WHERE user_id = ?
""", (user_id,))
accounts = cursor.fetchall()
return [{
'id': a[0],
'platform': a[1],
'username': a[2],
'status': a[3],
'last_activity': a[4],
'heir_id': a[5]
} for a in accounts]
except Exception as e:
st.error(f"Failed to fetch accounts: {str(e)}")
return []
def remove_account(account_id: str) -> bool:
"""Remove a social account."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("DELETE FROM social_accounts WHERE id = ?", (account_id,))
st.session_state.db.commit()
st.success("Account removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove account: {str(e)}")
return False
def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool:
"""Add a new social account with proper validation."""
try:
if not all([platform, username, password, heir_id, days]):
st.error("Please fill in all required fields")
return False
if days < 30:
st.error("Inactivity threshold must be at least 30 days")
return False
cursor = st.session_state.db.cursor()
# Check if heir exists
cursor.execute("SELECT id FROM heirs WHERE id = ? AND user_id = ?",
(heir_id, st.session_state.user['id']))
if not cursor.fetchone():
st.error("Selected heir not found")
return False
# Create account
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username, password,
heir_id, last_check, last_activity,
inactivity_threshold, created_at, last_modified,
status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
password, # Note: In production, this should be encrypted
heir_id,
current_time,
current_time,
days,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
log_security_event(
st.session_state.user['id'],
'account_added',
f'Added {platform} account for user {username}'
)
st.success("Account added successfully!")
return True
except Exception as e:
st.error(f"Failed to add account: {str(e)}")
return False
def add_heir(name: str, email: str, relationship: str, phone: Optional[str] = None) -> bool:
"""Add a new heir."""
try:
if not name or not email or not relationship:
st.error("Please fill in all required fields")
return False
cursor = st.session_state.db.cursor()
heir_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO heirs (id, user_id, name, email, relationship, phone, created_at, last_modified)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
heir_id,
st.session_state.user['id'],
name,
email,
relationship,
phone,
current_time,
current_time
))
st.session_state.db.commit()
st.success("Heir added successfully!")
return True
except Exception as e:
st.error(f"Failed to add heir: {str(e)}")
return False
def register_user(username: str, email: str, password: str, confirm: str) -> bool:
"""Register a new user with validation."""
if password != confirm:
st.error("Passwords don't match")
return False
valid, msg = validate_password(password)
if not valid:
st.error(msg)
return False
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
st.error("Invalid email format")
return False
try:
cursor = st.session_state.db.cursor()
# Check existing user
cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email))
if cursor.fetchone():
st.error("Username or email already exists")
return False
# Create user
user_id = str(uuid4())
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO users (id, username, email, password, created_at, last_password_change)
VALUES (?, ?, ?, ?, ?, ?)
""", (
user_id,
username,
email,
hashed.decode(),
current_time,
current_time
))
st.session_state.db.commit()
st.success("Registration successful! Please login.")
return True
except Exception as e:
st.error(f"Registration failed: {str(e)}")
return False
def login_user(username: str, password: str) -> bool:
"""Authenticate a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("SELECT id, password, email FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if not user:
st.error("Invalid username or password")
return False
if bcrypt.checkpw(password.encode(), user[1].encode()):
st.session_state.user = {
'id': user[0],
'username': username,
'email': user[2]
}
st.session_state.page = 'dashboard'
token = generate_jwt_token(user[0])
st.session_state.token = token
log_security_event(
user[0],
'login',
f'Successful login for user {username}'
)
return True
st.error("Invalid username or password")
return False
except Exception as e:
st.error(f"Login failed: {str(e)}")
return False
def get_2fa_status(user_id: str) -> bool:
"""Check if 2FA is enabled for a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("SELECT totp_secret FROM users WHERE id = ?", (user_id,))
result = cursor.fetchone()
return bool(result and result[0])
except Exception as e:
st.error(f"Failed to check 2FA status: {str(e)}")
return False
def enable_2fa_for_user(user_id: str, totp_secret: str) -> bool:
"""Enable 2FA for a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute(
"UPDATE users SET totp_secret = ? WHERE id = ?",
(totp_secret, user_id)
)
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to enable 2FA: {str(e)}")
return False
def change_password(current: str, new_pass: str, confirm: str) -> bool:
"""Change user password with validation."""
try:
if new_pass != confirm:
st.error("New passwords don't match")
return False
valid, msg = validate_password(new_pass)
if not valid:
st.error(msg)
return False
cursor = st.session_state.db.cursor()
cursor.execute(
"SELECT password FROM users WHERE id = ?",
(st.session_state.user['id'],)
)
stored_hash = cursor.fetchone()[0]
if not bcrypt.checkpw(current.encode(), stored_hash.encode()):
st.error("Current password is incorrect")
return False
new_hash = bcrypt.hashpw(new_pass.encode(), bcrypt.gensalt())
cursor.execute(
"UPDATE users SET password = ?, last_password_change = ? WHERE id = ?",
(new_hash.decode(), datetime.now().isoformat(), st.session_state.user['id'])
)
st.session_state.db.commit()
st.success("Password changed successfully!")
return True
except Exception as e:
st.error(f"Failed to change password: {str(e)}")
return False
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Page display functions
def show_login_page():
"""Display the login/register page."""
st.title("Digital Inheritance System")
tab1, tab2 = st.tabs(["Login", "Register"])
with tab1:
with st.form("login_form", clear_on_submit=True):
username = st.text_input("Username")
password = st.text_input("Password", type="password")
submitted = st.form_submit_button("Login")
if submitted and username and password:
if check_rate_limit(username):
st.error("Too many login attempts. Please try again later.")
return
if login_user(username, password):
st.success("Login successful!")
time.sleep(1) # Give time for message to show
st.rerun()
with tab2:
with st.form("register_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
reg_username = st.text_input(
"Username",
help="Choose a unique username"
)
reg_email = st.text_input(
"Email",
help="Enter your email address"
)
with col2:
reg_password = st.text_input(
"Password",
type="password",
help="Choose a strong password"
)
reg_confirm = st.text_input(
"Confirm Password",
type="password",
help="Re-enter your password"
)
# Password requirements info
st.markdown("""
Password must contain:
- At least 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one number
- At least one special character
""")
submitted = st.form_submit_button("Register")
if submitted:
if register_user(reg_username, reg_email, reg_password, reg_confirm):
st.success("Registration successful! Please login.")
time.sleep(2) # Give time for message to show
st.rerun()
def get_user_accounts(user_id: str) -> List[Dict]:
"""Get list of social accounts for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT
sa.id,
sa.platform,
sa.username,
sa.status,
sa.last_activity,
sa.heir_id,
h.name as heir_name,
h.relationship as heir_relationship
FROM social_accounts sa
LEFT JOIN heirs h ON sa.heir_id = h.id
WHERE sa.user_id = ?
ORDER BY sa.last_activity DESC
""", (user_id,))
accounts = []
for row in cursor.fetchall():
accounts.append({
'id': row[0],
'platform': row[1],
'username': row[2],
'status': row[3],
'last_activity': row[4],
'heir_id': row[5],
'heir_name': row[6],
'heir_relationship': row[7]
})
return accounts
except Exception as e:
st.error(f"Failed to fetch accounts: {str(e)}")
return []
def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool:
"""Add a new social account with proper validation."""
try:
if not all([platform, username, password, heir_id, days]):
st.error("Please fill in all required fields")
return False
if days < 30:
st.error("Inactivity threshold must be at least 30 days")
return False
cursor = st.session_state.db.cursor()
# Check if account already exists
cursor.execute("""
SELECT id FROM social_accounts
WHERE platform = ? AND username = ? AND user_id = ?
""", (platform, username, st.session_state.user['id']))
if cursor.fetchone():
st.error("This account is already linked")
return False
# Create account
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username, password,
heir_id, last_check, last_activity,
inactivity_threshold, created_at, last_modified,
status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
password, # In production, encrypt this
heir_id,
current_time,
current_time,
days,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
log_security_event(
st.session_state.user['id'],
'account_added',
f'Added {platform} account for user {username}'
)
st.success(f"Account {username} on {platform} added successfully!")
time.sleep(1) # Give time for message to show
return True
except Exception as e:
st.error(f"Failed to add account: {str(e)}")
return False
def show_dashboard():
"""Display the main dashboard."""
st.title("Digital Inheritance Dashboard")
# Add navigation menu at top
menu_col1, menu_col2, menu_col3, menu_col4 = st.columns([2,1,1,1])
with menu_col1:
st.subheader(f"Welcome, {st.session_state.user['username']}")
with menu_col2:
if st.button("Add Account", key="add_account_btn", type="primary"):
st.session_state.page = 'add_account'
st.rerun()
with menu_col3:
if st.button("Manage Heirs", key="manage_heirs_btn"):
st.session_state.page = 'manage_heirs'
st.rerun()
with menu_col4:
if st.button("Logout", key="logout_btn", type="secondary"):
logout_user()
# Get and display accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Add Account' to get started.")
return
st.subheader("Your Digital Accounts")
# Use columns for better layout
for account in accounts:
with st.container():
col1, col2, col3 = st.columns([3,3,1])
with col1:
st.markdown(f"### {account['platform']} - {account['username']}")
st.write(f"**Status:** {account['status'].title()}")
last_activity = datetime.fromisoformat(account['last_activity']).strftime("%Y-%m-%d %H:%M")
st.write(f"**Last Activity:** {last_activity}")
with col2:
heir = get_heir_info(account['heir_id'])
if heir:
st.write(f"**Heir:** {heir['name']}")
st.write(f"**Relationship:** {heir['relationship']}")
if heir.get('email'):
st.write(f"**Contact:** {heir['email']}")
with col3:
if st.button("Remove", key=f"remove_{account['id']}", type="secondary"):
if remove_account(account['id']):
st.success("Account removed successfully!")
time.sleep(1) # Give time for message to show
st.rerun()
def show_add_account_page():
"""Display the add account page."""
st.title("Add Digital Account")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
heirs = get_heirs_list(st.session_state.user['id'])
if not heirs:
st.warning("Please add at least one heir before adding accounts.")
if st.button("Add Heir"):
st.session_state.page = 'manage_heirs'
st.rerun()
return
with st.form("add_account_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
platform = st.selectbox(
"Platform",
["Facebook", "Twitter", "Instagram", "LinkedIn"],
help="Select the social media platform"
)
username = st.text_input(
"Username",
help="Enter your username for the selected platform"
)
with col2:
password = st.text_input(
"Password",
type="password",
help="Enter your account password"
)
heir = st.selectbox(
"Select Heir",
options=heirs,
format_func=lambda x: f"{x['name']} ({x['relationship']})",
help="Choose the heir for this account"
)
days = st.slider(
"Inactivity Threshold (days)",
min_value=30,
max_value=365,
value=90,
help="Number of days of inactivity before notifying heir"
)
submitted = st.form_submit_button("Add Account", use_container_width=True)
if submitted:
if heir:
add_account(platform, username, password, heir['id'], days)
else:
st.error("Please select an heir")
def show_heir_management_page():
"""Display the heir management page."""
st.title("Manage Heirs")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
tab1, tab2 = st.tabs(["View Heirs", "Add Heir"])
with tab1:
heirs = get_heirs_list(st.session_state.user['id'])
if not heirs:
st.info("No heirs added yet. Use the 'Add Heir' tab to get started.")
else:
for heir in heirs:
with st.expander(f"{heir['name']} - {heir['relationship']}", expanded=True):
col1, col2 = st.columns([3,1])
with col1:
st.write(f"**Email:** {heir['email']}")
if heir.get('phone'):
st.write(f"**Phone:** {heir['phone']}")
with col2:
if st.button("Remove", key=f"remove_heir_{heir['id']}", type="secondary"):
remove_heir(heir['id'])
st.rerun()
with tab2:
with st.form("add_heir_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
name = st.text_input(
"Full Name",
help="Enter the heir's full name"
)
email = st.text_input(
"Email",
help="Enter the heir's email address"
)
with col2:
relationship = st.selectbox(
"Relationship",
["Family", "Friend", "Legal Representative", "Other"],
help="Select your relationship with the heir"
)
phone = st.text_input(
"Phone (Optional)",
help="Enter the heir's phone number (optional)"
)
submitted = st.form_submit_button("Add Heir", use_container_width=True)
if submitted:
add_heir(name, email, relationship, phone)
def show_security_settings_page():
"""Display the security settings page."""
st.title("Security Settings")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
tab1, tab2 = st.tabs(["Two-Factor Authentication", "Change Password"])
with tab1:
current_2fa_status = get_2fa_status(st.session_state.user['id'])
st.write("Current Status:", "Enabled" if current_2fa_status else "Disabled")
if not current_2fa_status:
if st.button("Set Up 2FA"):
st.session_state.setup_2fa = True
st.rerun()
if hasattr(st.session_state, 'setup_2fa'):
setup_2fa()
with tab2:
with st.form("change_password_form", clear_on_submit=True):
current_password = st.text_input(
"Current Password",
type="password",
help="Enter your current password"
)
new_password = st.text_input(
"New Password",
type="password",
help="Enter your new password"
)
confirm_password = st.text_input(
"Confirm New Password",
type="password",
help="Confirm your new password"
)
submitted = st.form_submit_button("Change Password", use_container_width=True)
if submitted:
change_password(current_password, new_password, confirm_password)
def setup_2fa():
"""Set up two-factor authentication for a user."""
if 'totp_secret' not in st.session_state:
st.session_state.totp_secret = pyotp.random_base32()
totp = pyotp.TOTP(st.session_state.totp_secret)
provisioning_uri = totp.provisioning_uri(
st.session_state.user['email'],
issuer_name="Digital Heir System"
)
st.write("1. Scan this QR code with your authenticator app:")
st.code(provisioning_uri) # Since we can't generate QR codes directly
st.write("2. Enter the code from your authenticator app:")
code = st.text_input("Verification Code", key="2fa_code")
verify_col1, verify_col2 = st.columns([1,3])
with verify_col1:
if st.button("Verify Code"):
if totp.verify(code):
if enable_2fa_for_user(st.session_state.user['id'], st.session_state.totp_secret):
st.success("2FA enabled successfully!")
del st.session_state.totp_secret
del st.session_state.setup_2fa
st.rerun()
else:
st.error("Invalid code")
with verify_col2:
if st.button("Cancel"):
del st.session_state.totp_secret
del st.session_state.setup_2fa
st.rerun()
def check_account_activity(platform, username):
"""Check social media account activity."""
if platform == 'twitter':
url = f"https://nitter.net/{username}/rss"
# Parse RSS feed for latest activity
pass
elif platform == 'facebook':
url = f"https://www.facebook.com/{username}"
# Implementation needed
pass
def remove_heir(heir_id: str) -> bool:
"""Remove an heir and their associated accounts."""
try:
cursor = st.session_state.db.cursor()
# First check if there are any accounts associated with this heir
cursor.execute("""
SELECT COUNT(*) FROM social_accounts
WHERE heir_id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
account_count = cursor.fetchone()[0]
if account_count > 0:
st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.")
return False
# If no accounts are associated, remove the heir
cursor.execute("""
DELETE FROM heirs
WHERE id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
st.session_state.db.commit()
st.success("Heir removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove heir: {str(e)}")
return False
def check_account_activity(platform, username):
if platform == 'twitter':
url = f"https://nitter.net/{username}/rss"
# Parse RSS feed for latest activity
elif platform == 'facebook':
url = f"https://www.facebook.com/{username}"
def remove_heir(heir_id: str) -> bool:
"""Remove an heir and their associated accounts."""
try:
cursor = st.session_state.db.cursor()
# First check if there are any accounts associated with this heir
cursor.execute("""
SELECT COUNT(*) FROM social_accounts
WHERE heir_id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
account_count = cursor.fetchone()[0]
if account_count > 0:
st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.")
return False
# If no accounts are associated, remove the heir
cursor.execute("""
DELETE FROM heirs
WHERE id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
st.session_state.db.commit()
st.success("Heir removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove heir: {str(e)}")
return False
def initialize_database():
"""
Initialize database tables needed for the application
"""
try:
cursor = st.session_state.db.cursor()
# Create login_attempt_history table instead of login_attempts
cursor.execute("""
CREATE TABLE IF NOT EXISTS login_attempt_history (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
attempt_time TEXT NOT NULL,
ip_address TEXT,
success BOOLEAN DEFAULT FALSE
)
""")
# Create other necessary tables if they don't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS social_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
username TEXT NOT NULL,
status TEXT DEFAULT 'active',
last_activity TEXT,
heir_id TEXT,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (heir_id) REFERENCES heirs (id)
)
""")
st.session_state.db.commit()
except Exception as e:
logger.error(f"Database initialization failed: {str(e)}")
st.error("Failed to initialize database. Please contact support.")
def initialize_services():
"""Initialize required services."""
if hasattr(st.session_state, 'db'):
audit_logger.initialize_with_connection(st.session_state.db)
# Main initialization function
def initialize_app():
"""Initialize the application."""
st.set_page_config(
page_title="DIGITAL INHERITANCE SYSTEM",
page_icon="πŸ”",
layout="wide"
)
if 'db' in st.session_state:
initialize_database()
if 'page' not in st.session_state:
st.session_state.page = 'login'
initialize_services()