Spaces:
Build error
Build error
| import pyotp | |
| import time | |
| import streamlit as st | |
| from datetime import datetime | |
| from uuid import uuid4 | |
| import logging | |
| from typing import List, Dict, Optional | |
| from utils.security import clear_security_metrics | |
| import bcrypt | |
| import re | |
| from utils.security import ( | |
| validate_password, | |
| check_rate_limit, | |
| generate_jwt_token, | |
| verify_jwt_token, | |
| log_security_event | |
| ) | |
| from utils.audit import audit_logger | |
| from dashboard_components import dashboard_header, account_linking_form | |
| from typing import Callable | |
| def dashboard_header(): | |
| """ | |
| Streamlit version of DashboardHeader.jsx | |
| """ | |
| col1, col2 = st.columns([2, 3]) | |
| with col1: | |
| st.title("Digital Inheritance Dashboard") | |
| st.markdown("<span style='color: gray; font-size: 0.8em'>by Muhammad Shaheer</span>", | |
| unsafe_allow_html=True) | |
| with col2: | |
| cols = st.columns(3) | |
| with cols[0]: | |
| if st.button("π¦ Wallets", use_container_width=True): | |
| st.session_state.page = 'wallets' | |
| st.rerun() | |
| with cols[1]: | |
| if st.button("π Link Accounts", use_container_width=True): | |
| st.session_state.page = 'link_accounts' | |
| st.rerun() | |
| with cols[2]: | |
| if st.button("πͺ Logout", type="secondary", use_container_width=True): | |
| logout_user() | |
| def show_dashboard(): | |
| """ | |
| Updated dashboard view incorporating React components | |
| """ | |
| # Show the header | |
| dashboard_header() | |
| # Get user accounts | |
| accounts = get_user_accounts(st.session_state.user['id']) | |
| if not accounts: | |
| st.info("No digital accounts added yet. Click 'Link Accounts' to get started.") | |
| return | |
| # Display accounts in a modern card layout | |
| st.subheader("Your Digital Accounts") | |
| for account in accounts: | |
| with st.container(): | |
| cols = st.columns([1, 2, 1]) | |
| with cols[0]: | |
| platform_icons = { | |
| "facebook": "π", | |
| "twitter": "π¦", | |
| "instagram": "πΈ", | |
| "linkedin": "πΌ" | |
| } | |
| icon = platform_icons.get(account['platform'].lower(), "π") | |
| st.markdown(f"### {icon} {account['platform']}") | |
| st.write(f"**@{account['username']}**") | |
| with cols[1]: | |
| st.write(f"**Status:** {account['status']}") | |
| st.write(f"**Last Activity:** {account['last_activity']}") | |
| if account.get('heir_name'): | |
| st.write(f"**Heir:** {account['heir_name']}") | |
| with cols[2]: | |
| if st.button("Remove Account", key=f"remove_{account['id']}", | |
| type="secondary", use_container_width=True): | |
| if remove_account(account['id']): | |
| st.success("Account removed successfully!") | |
| time.sleep(1) | |
| st.rerun() | |
| def logout_user(): | |
| """Handle user logout.""" | |
| try: | |
| # Log the logout event | |
| if 'user' in st.session_state: | |
| log_security_event( | |
| st.session_state.user['id'], | |
| 'logout', | |
| f'User {st.session_state.user["username"]} logged out' | |
| ) | |
| # Clear metrics and session | |
| clear_security_metrics() # Unregister Prometheus metrics | |
| st.session_state.clear() | |
| st.session_state.page = 'login' | |
| st.rerun() | |
| except Exception as e: | |
| logger.error(f"Logout failed: {str(e)}") | |
| st.error("An error occurred during logout. Please try again.") | |
| def account_linking_form(): | |
| """ | |
| Streamlit version of AccountLinking.jsx | |
| """ | |
| st.title("Link Social Media Account") | |
| # Platform selection | |
| platform = st.radio( | |
| "Select Platform", | |
| ["Facebook", "Twitter", "Instagram", "LinkedIn"], | |
| horizontal=True, | |
| format_func=lambda x: { | |
| "Facebook": "π Facebook", | |
| "Twitter": "π¦ Twitter", | |
| "Instagram": "πΈ Instagram", | |
| "LinkedIn": "πΌ LinkedIn" | |
| }[x] | |
| ) | |
| if platform: | |
| username = st.text_input("Username", placeholder=f"Enter your {platform} username") | |
| # Info message with custom styling | |
| st.info( | |
| "We'll monitor this account for inactivity. " | |
| "Make sure the profile is public so we can track activity." | |
| ) | |
| if username: | |
| if st.button("Link Account", type="primary", use_container_width=True): | |
| try: | |
| # Add account linking logic here | |
| success = link_social_account(platform.lower(), username) | |
| if success: | |
| st.success(f"Successfully linked {platform} account: {username}") | |
| time.sleep(1) | |
| st.session_state.page = 'dashboard' | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Failed to link account: {str(e)}") | |
| def apply_custom_css(): | |
| """ | |
| Apply custom CSS to make the components look more like the original React design | |
| """ | |
| st.markdown(""" | |
| <style> | |
| /* Custom styling for the header */ | |
| .stButton button { | |
| border-radius: 8px; | |
| padding: 0.5rem 1rem; | |
| border: 1px solid #e5e7eb; | |
| } | |
| /* Style for radio buttons to look more like the original design */ | |
| .stRadio > label { | |
| font-weight: 600; | |
| margin-bottom: 1rem; | |
| } | |
| /* Container styling */ | |
| .element-container { | |
| background-color: white; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1); | |
| } | |
| /* Input styling */ | |
| .stTextInput input { | |
| border-radius: 8px; | |
| border: 1px solid #e5e7eb; | |
| padding: 0.5rem; | |
| } | |
| /* Info message styling */ | |
| .stAlert { | |
| background-color: #EFF6FF; | |
| color: #1D4ED8; | |
| border-radius: 8px; | |
| padding: 1rem; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Example usage in your main Streamlit app: | |
| def main(): | |
| # Initialize session state | |
| if 'page' not in st.session_state: | |
| st.session_state.page = 'dashboard' | |
| # Apply custom CSS | |
| apply_custom_css() | |
| def logout(): | |
| st.session_state.clear() | |
| st.rerun() | |
| # Show header | |
| dashboard_header(logout) | |
| # Show different pages based on state | |
| if st.session_state.page == 'link_accounts': | |
| account_linking_form() | |
| # Add other pages as needed | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def show_dashboard(): | |
| """Display the main dashboard.""" | |
| # Apply custom styling | |
| apply_custom_css() | |
| # Show dashboard header | |
| dashboard_header(lambda: logout_user()) | |
| # Get and display accounts | |
| accounts = get_user_accounts(st.session_state.user['id']) | |
| if not accounts: | |
| st.info("No digital accounts added yet. Click 'Add Account' to get started.") | |
| return | |
| def logout_user(): | |
| st.session_state.clear() | |
| st.rerun() | |
| def verify_account(platform, username): | |
| """Verify if a social media account exists and is public.""" | |
| # Implementation needed | |
| return True | |
| def link_social_account(platform, username): | |
| # Validate the account exists | |
| account_exists = verify_account(platform, username) | |
| if not account_exists: | |
| st.error("Account not found or not public") | |
| return False | |
| # Store account info | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| account_id = str(uuid4()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO social_accounts ( | |
| id, user_id, platform, username, | |
| last_check, last_activity, | |
| created_at, status | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| account_id, | |
| st.session_state.user['id'], | |
| platform, | |
| username, | |
| current_time, | |
| current_time, | |
| current_time, | |
| 'active' | |
| )) | |
| st.session_state.db.commit() | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to link account: {str(e)}") | |
| return False | |
| def link_social_account(platform, username): | |
| # Validate the account exists | |
| account_exists = verify_account(platform, username) | |
| if not account_exists: | |
| st.error("Account not found or not public") | |
| return False | |
| # Store account info | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| account_id = str(uuid4()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO social_accounts ( | |
| id, user_id, platform, username, | |
| last_check, last_activity, | |
| created_at, status | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| account_id, | |
| st.session_state.user['id'], | |
| platform, | |
| username, | |
| current_time, | |
| current_time, | |
| current_time, | |
| 'active' | |
| )) | |
| st.session_state.db.commit() | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to link account: {str(e)}") | |
| return False | |
| def get_heir_info(heir_id: str) -> Optional[Dict]: | |
| """Get information about a specific heir.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute(""" | |
| SELECT id, name, email, relationship | |
| FROM heirs | |
| WHERE id = ? | |
| """, (heir_id,)) | |
| heir = cursor.fetchone() | |
| if heir: | |
| return { | |
| 'id': heir[0], | |
| 'name': heir[1], | |
| 'email': heir[2], | |
| 'relationship': heir[3] | |
| } | |
| return None | |
| except Exception as e: | |
| st.error(f"Failed to fetch heir info: {str(e)}") | |
| return None | |
| def get_heirs_list(user_id: str) -> List[Dict]: | |
| """Get list of heirs for the given user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute(""" | |
| SELECT id, name, email, relationship | |
| FROM heirs | |
| WHERE user_id = ? | |
| """, (user_id,)) | |
| heirs = cursor.fetchall() | |
| return [{'id': h[0], 'name': h[1], 'email': h[2], 'relationship': h[3]} for h in heirs] | |
| except Exception as e: | |
| st.error(f"Failed to fetch heirs: {str(e)}") | |
| return [] | |
| def get_user_accounts(user_id: str) -> List[Dict]: | |
| """Get list of social accounts for the given user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute(""" | |
| SELECT id, platform, username, status, last_activity, heir_id | |
| FROM social_accounts | |
| WHERE user_id = ? | |
| """, (user_id,)) | |
| accounts = cursor.fetchall() | |
| return [{ | |
| 'id': a[0], | |
| 'platform': a[1], | |
| 'username': a[2], | |
| 'status': a[3], | |
| 'last_activity': a[4], | |
| 'heir_id': a[5] | |
| } for a in accounts] | |
| except Exception as e: | |
| st.error(f"Failed to fetch accounts: {str(e)}") | |
| return [] | |
| def remove_account(account_id: str) -> bool: | |
| """Remove a social account.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute("DELETE FROM social_accounts WHERE id = ?", (account_id,)) | |
| st.session_state.db.commit() | |
| st.success("Account removed successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to remove account: {str(e)}") | |
| return False | |
| def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool: | |
| """Add a new social account with proper validation.""" | |
| try: | |
| if not all([platform, username, password, heir_id, days]): | |
| st.error("Please fill in all required fields") | |
| return False | |
| if days < 30: | |
| st.error("Inactivity threshold must be at least 30 days") | |
| return False | |
| cursor = st.session_state.db.cursor() | |
| # Check if heir exists | |
| cursor.execute("SELECT id FROM heirs WHERE id = ? AND user_id = ?", | |
| (heir_id, st.session_state.user['id'])) | |
| if not cursor.fetchone(): | |
| st.error("Selected heir not found") | |
| return False | |
| # Create account | |
| account_id = str(uuid4()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO social_accounts ( | |
| id, user_id, platform, username, password, | |
| heir_id, last_check, last_activity, | |
| inactivity_threshold, created_at, last_modified, | |
| status | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| account_id, | |
| st.session_state.user['id'], | |
| platform, | |
| username, | |
| password, # Note: In production, this should be encrypted | |
| heir_id, | |
| current_time, | |
| current_time, | |
| days, | |
| current_time, | |
| current_time, | |
| 'active' | |
| )) | |
| st.session_state.db.commit() | |
| log_security_event( | |
| st.session_state.user['id'], | |
| 'account_added', | |
| f'Added {platform} account for user {username}' | |
| ) | |
| st.success("Account added successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to add account: {str(e)}") | |
| return False | |
| def add_heir(name: str, email: str, relationship: str, phone: Optional[str] = None) -> bool: | |
| """Add a new heir.""" | |
| try: | |
| if not name or not email or not relationship: | |
| st.error("Please fill in all required fields") | |
| return False | |
| cursor = st.session_state.db.cursor() | |
| heir_id = str(uuid4()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO heirs (id, user_id, name, email, relationship, phone, created_at, last_modified) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| heir_id, | |
| st.session_state.user['id'], | |
| name, | |
| email, | |
| relationship, | |
| phone, | |
| current_time, | |
| current_time | |
| )) | |
| st.session_state.db.commit() | |
| st.success("Heir added successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to add heir: {str(e)}") | |
| return False | |
| def register_user(username: str, email: str, password: str, confirm: str) -> bool: | |
| """Register a new user with validation.""" | |
| if password != confirm: | |
| st.error("Passwords don't match") | |
| return False | |
| valid, msg = validate_password(password) | |
| if not valid: | |
| st.error(msg) | |
| return False | |
| if not re.match(r"[^@]+@[^@]+\.[^@]+", email): | |
| st.error("Invalid email format") | |
| return False | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| # Check existing user | |
| cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email)) | |
| if cursor.fetchone(): | |
| st.error("Username or email already exists") | |
| return False | |
| # Create user | |
| user_id = str(uuid4()) | |
| hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO users (id, username, email, password, created_at, last_password_change) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, ( | |
| user_id, | |
| username, | |
| email, | |
| hashed.decode(), | |
| current_time, | |
| current_time | |
| )) | |
| st.session_state.db.commit() | |
| st.success("Registration successful! Please login.") | |
| return True | |
| except Exception as e: | |
| st.error(f"Registration failed: {str(e)}") | |
| return False | |
| def login_user(username: str, password: str) -> bool: | |
| """Authenticate a user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute("SELECT id, password, email FROM users WHERE username = ?", (username,)) | |
| user = cursor.fetchone() | |
| if not user: | |
| st.error("Invalid username or password") | |
| return False | |
| if bcrypt.checkpw(password.encode(), user[1].encode()): | |
| st.session_state.user = { | |
| 'id': user[0], | |
| 'username': username, | |
| 'email': user[2] | |
| } | |
| st.session_state.page = 'dashboard' | |
| token = generate_jwt_token(user[0]) | |
| st.session_state.token = token | |
| log_security_event( | |
| user[0], | |
| 'login', | |
| f'Successful login for user {username}' | |
| ) | |
| return True | |
| st.error("Invalid username or password") | |
| return False | |
| except Exception as e: | |
| st.error(f"Login failed: {str(e)}") | |
| return False | |
| def get_2fa_status(user_id: str) -> bool: | |
| """Check if 2FA is enabled for a user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute("SELECT totp_secret FROM users WHERE id = ?", (user_id,)) | |
| result = cursor.fetchone() | |
| return bool(result and result[0]) | |
| except Exception as e: | |
| st.error(f"Failed to check 2FA status: {str(e)}") | |
| return False | |
| def enable_2fa_for_user(user_id: str, totp_secret: str) -> bool: | |
| """Enable 2FA for a user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute( | |
| "UPDATE users SET totp_secret = ? WHERE id = ?", | |
| (totp_secret, user_id) | |
| ) | |
| st.session_state.db.commit() | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to enable 2FA: {str(e)}") | |
| return False | |
| def change_password(current: str, new_pass: str, confirm: str) -> bool: | |
| """Change user password with validation.""" | |
| try: | |
| if new_pass != confirm: | |
| st.error("New passwords don't match") | |
| return False | |
| valid, msg = validate_password(new_pass) | |
| if not valid: | |
| st.error(msg) | |
| return False | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute( | |
| "SELECT password FROM users WHERE id = ?", | |
| (st.session_state.user['id'],) | |
| ) | |
| stored_hash = cursor.fetchone()[0] | |
| if not bcrypt.checkpw(current.encode(), stored_hash.encode()): | |
| st.error("Current password is incorrect") | |
| return False | |
| new_hash = bcrypt.hashpw(new_pass.encode(), bcrypt.gensalt()) | |
| cursor.execute( | |
| "UPDATE users SET password = ?, last_password_change = ? WHERE id = ?", | |
| (new_hash.decode(), datetime.now().isoformat(), st.session_state.user['id']) | |
| ) | |
| st.session_state.db.commit() | |
| st.success("Password changed successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to change password: {str(e)}") | |
| return False | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Page display functions | |
| def show_login_page(): | |
| """Display the login/register page.""" | |
| st.title("Digital Inheritance System") | |
| tab1, tab2 = st.tabs(["Login", "Register"]) | |
| with tab1: | |
| with st.form("login_form", clear_on_submit=True): | |
| username = st.text_input("Username") | |
| password = st.text_input("Password", type="password") | |
| submitted = st.form_submit_button("Login") | |
| if submitted and username and password: | |
| if check_rate_limit(username): | |
| st.error("Too many login attempts. Please try again later.") | |
| return | |
| if login_user(username, password): | |
| st.success("Login successful!") | |
| time.sleep(1) # Give time for message to show | |
| st.rerun() | |
| with tab2: | |
| with st.form("register_form", clear_on_submit=True): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| reg_username = st.text_input( | |
| "Username", | |
| help="Choose a unique username" | |
| ) | |
| reg_email = st.text_input( | |
| "Email", | |
| help="Enter your email address" | |
| ) | |
| with col2: | |
| reg_password = st.text_input( | |
| "Password", | |
| type="password", | |
| help="Choose a strong password" | |
| ) | |
| reg_confirm = st.text_input( | |
| "Confirm Password", | |
| type="password", | |
| help="Re-enter your password" | |
| ) | |
| # Password requirements info | |
| st.markdown(""" | |
| Password must contain: | |
| - At least 8 characters | |
| - At least one uppercase letter | |
| - At least one lowercase letter | |
| - At least one number | |
| - At least one special character | |
| """) | |
| submitted = st.form_submit_button("Register") | |
| if submitted: | |
| if register_user(reg_username, reg_email, reg_password, reg_confirm): | |
| st.success("Registration successful! Please login.") | |
| time.sleep(2) # Give time for message to show | |
| st.rerun() | |
| def get_user_accounts(user_id: str) -> List[Dict]: | |
| """Get list of social accounts for the given user.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| cursor.execute(""" | |
| SELECT | |
| sa.id, | |
| sa.platform, | |
| sa.username, | |
| sa.status, | |
| sa.last_activity, | |
| sa.heir_id, | |
| h.name as heir_name, | |
| h.relationship as heir_relationship | |
| FROM social_accounts sa | |
| LEFT JOIN heirs h ON sa.heir_id = h.id | |
| WHERE sa.user_id = ? | |
| ORDER BY sa.last_activity DESC | |
| """, (user_id,)) | |
| accounts = [] | |
| for row in cursor.fetchall(): | |
| accounts.append({ | |
| 'id': row[0], | |
| 'platform': row[1], | |
| 'username': row[2], | |
| 'status': row[3], | |
| 'last_activity': row[4], | |
| 'heir_id': row[5], | |
| 'heir_name': row[6], | |
| 'heir_relationship': row[7] | |
| }) | |
| return accounts | |
| except Exception as e: | |
| st.error(f"Failed to fetch accounts: {str(e)}") | |
| return [] | |
| def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool: | |
| """Add a new social account with proper validation.""" | |
| try: | |
| if not all([platform, username, password, heir_id, days]): | |
| st.error("Please fill in all required fields") | |
| return False | |
| if days < 30: | |
| st.error("Inactivity threshold must be at least 30 days") | |
| return False | |
| cursor = st.session_state.db.cursor() | |
| # Check if account already exists | |
| cursor.execute(""" | |
| SELECT id FROM social_accounts | |
| WHERE platform = ? AND username = ? AND user_id = ? | |
| """, (platform, username, st.session_state.user['id'])) | |
| if cursor.fetchone(): | |
| st.error("This account is already linked") | |
| return False | |
| # Create account | |
| account_id = str(uuid4()) | |
| current_time = datetime.now().isoformat() | |
| cursor.execute(""" | |
| INSERT INTO social_accounts ( | |
| id, user_id, platform, username, password, | |
| heir_id, last_check, last_activity, | |
| inactivity_threshold, created_at, last_modified, | |
| status | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| account_id, | |
| st.session_state.user['id'], | |
| platform, | |
| username, | |
| password, # In production, encrypt this | |
| heir_id, | |
| current_time, | |
| current_time, | |
| days, | |
| current_time, | |
| current_time, | |
| 'active' | |
| )) | |
| st.session_state.db.commit() | |
| log_security_event( | |
| st.session_state.user['id'], | |
| 'account_added', | |
| f'Added {platform} account for user {username}' | |
| ) | |
| st.success(f"Account {username} on {platform} added successfully!") | |
| time.sleep(1) # Give time for message to show | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to add account: {str(e)}") | |
| return False | |
| def show_dashboard(): | |
| """Display the main dashboard.""" | |
| st.title("Digital Inheritance Dashboard") | |
| # Add navigation menu at top | |
| menu_col1, menu_col2, menu_col3, menu_col4 = st.columns([2,1,1,1]) | |
| with menu_col1: | |
| st.subheader(f"Welcome, {st.session_state.user['username']}") | |
| with menu_col2: | |
| if st.button("Add Account", key="add_account_btn", type="primary"): | |
| st.session_state.page = 'add_account' | |
| st.rerun() | |
| with menu_col3: | |
| if st.button("Manage Heirs", key="manage_heirs_btn"): | |
| st.session_state.page = 'manage_heirs' | |
| st.rerun() | |
| with menu_col4: | |
| if st.button("Logout", key="logout_btn", type="secondary"): | |
| logout_user() | |
| # Get and display accounts | |
| accounts = get_user_accounts(st.session_state.user['id']) | |
| if not accounts: | |
| st.info("No digital accounts added yet. Click 'Add Account' to get started.") | |
| return | |
| st.subheader("Your Digital Accounts") | |
| # Use columns for better layout | |
| for account in accounts: | |
| with st.container(): | |
| col1, col2, col3 = st.columns([3,3,1]) | |
| with col1: | |
| st.markdown(f"### {account['platform']} - {account['username']}") | |
| st.write(f"**Status:** {account['status'].title()}") | |
| last_activity = datetime.fromisoformat(account['last_activity']).strftime("%Y-%m-%d %H:%M") | |
| st.write(f"**Last Activity:** {last_activity}") | |
| with col2: | |
| heir = get_heir_info(account['heir_id']) | |
| if heir: | |
| st.write(f"**Heir:** {heir['name']}") | |
| st.write(f"**Relationship:** {heir['relationship']}") | |
| if heir.get('email'): | |
| st.write(f"**Contact:** {heir['email']}") | |
| with col3: | |
| if st.button("Remove", key=f"remove_{account['id']}", type="secondary"): | |
| if remove_account(account['id']): | |
| st.success("Account removed successfully!") | |
| time.sleep(1) # Give time for message to show | |
| st.rerun() | |
| def show_add_account_page(): | |
| """Display the add account page.""" | |
| st.title("Add Digital Account") | |
| if st.button("β Back"): | |
| st.session_state.page = 'dashboard' | |
| st.rerun() | |
| heirs = get_heirs_list(st.session_state.user['id']) | |
| if not heirs: | |
| st.warning("Please add at least one heir before adding accounts.") | |
| if st.button("Add Heir"): | |
| st.session_state.page = 'manage_heirs' | |
| st.rerun() | |
| return | |
| with st.form("add_account_form", clear_on_submit=True): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| platform = st.selectbox( | |
| "Platform", | |
| ["Facebook", "Twitter", "Instagram", "LinkedIn"], | |
| help="Select the social media platform" | |
| ) | |
| username = st.text_input( | |
| "Username", | |
| help="Enter your username for the selected platform" | |
| ) | |
| with col2: | |
| password = st.text_input( | |
| "Password", | |
| type="password", | |
| help="Enter your account password" | |
| ) | |
| heir = st.selectbox( | |
| "Select Heir", | |
| options=heirs, | |
| format_func=lambda x: f"{x['name']} ({x['relationship']})", | |
| help="Choose the heir for this account" | |
| ) | |
| days = st.slider( | |
| "Inactivity Threshold (days)", | |
| min_value=30, | |
| max_value=365, | |
| value=90, | |
| help="Number of days of inactivity before notifying heir" | |
| ) | |
| submitted = st.form_submit_button("Add Account", use_container_width=True) | |
| if submitted: | |
| if heir: | |
| add_account(platform, username, password, heir['id'], days) | |
| else: | |
| st.error("Please select an heir") | |
| def show_heir_management_page(): | |
| """Display the heir management page.""" | |
| st.title("Manage Heirs") | |
| if st.button("β Back"): | |
| st.session_state.page = 'dashboard' | |
| st.rerun() | |
| tab1, tab2 = st.tabs(["View Heirs", "Add Heir"]) | |
| with tab1: | |
| heirs = get_heirs_list(st.session_state.user['id']) | |
| if not heirs: | |
| st.info("No heirs added yet. Use the 'Add Heir' tab to get started.") | |
| else: | |
| for heir in heirs: | |
| with st.expander(f"{heir['name']} - {heir['relationship']}", expanded=True): | |
| col1, col2 = st.columns([3,1]) | |
| with col1: | |
| st.write(f"**Email:** {heir['email']}") | |
| if heir.get('phone'): | |
| st.write(f"**Phone:** {heir['phone']}") | |
| with col2: | |
| if st.button("Remove", key=f"remove_heir_{heir['id']}", type="secondary"): | |
| remove_heir(heir['id']) | |
| st.rerun() | |
| with tab2: | |
| with st.form("add_heir_form", clear_on_submit=True): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| name = st.text_input( | |
| "Full Name", | |
| help="Enter the heir's full name" | |
| ) | |
| email = st.text_input( | |
| "Email", | |
| help="Enter the heir's email address" | |
| ) | |
| with col2: | |
| relationship = st.selectbox( | |
| "Relationship", | |
| ["Family", "Friend", "Legal Representative", "Other"], | |
| help="Select your relationship with the heir" | |
| ) | |
| phone = st.text_input( | |
| "Phone (Optional)", | |
| help="Enter the heir's phone number (optional)" | |
| ) | |
| submitted = st.form_submit_button("Add Heir", use_container_width=True) | |
| if submitted: | |
| add_heir(name, email, relationship, phone) | |
| def show_security_settings_page(): | |
| """Display the security settings page.""" | |
| st.title("Security Settings") | |
| if st.button("β Back"): | |
| st.session_state.page = 'dashboard' | |
| st.rerun() | |
| tab1, tab2 = st.tabs(["Two-Factor Authentication", "Change Password"]) | |
| with tab1: | |
| current_2fa_status = get_2fa_status(st.session_state.user['id']) | |
| st.write("Current Status:", "Enabled" if current_2fa_status else "Disabled") | |
| if not current_2fa_status: | |
| if st.button("Set Up 2FA"): | |
| st.session_state.setup_2fa = True | |
| st.rerun() | |
| if hasattr(st.session_state, 'setup_2fa'): | |
| setup_2fa() | |
| with tab2: | |
| with st.form("change_password_form", clear_on_submit=True): | |
| current_password = st.text_input( | |
| "Current Password", | |
| type="password", | |
| help="Enter your current password" | |
| ) | |
| new_password = st.text_input( | |
| "New Password", | |
| type="password", | |
| help="Enter your new password" | |
| ) | |
| confirm_password = st.text_input( | |
| "Confirm New Password", | |
| type="password", | |
| help="Confirm your new password" | |
| ) | |
| submitted = st.form_submit_button("Change Password", use_container_width=True) | |
| if submitted: | |
| change_password(current_password, new_password, confirm_password) | |
| def setup_2fa(): | |
| """Set up two-factor authentication for a user.""" | |
| if 'totp_secret' not in st.session_state: | |
| st.session_state.totp_secret = pyotp.random_base32() | |
| totp = pyotp.TOTP(st.session_state.totp_secret) | |
| provisioning_uri = totp.provisioning_uri( | |
| st.session_state.user['email'], | |
| issuer_name="Digital Heir System" | |
| ) | |
| st.write("1. Scan this QR code with your authenticator app:") | |
| st.code(provisioning_uri) # Since we can't generate QR codes directly | |
| st.write("2. Enter the code from your authenticator app:") | |
| code = st.text_input("Verification Code", key="2fa_code") | |
| verify_col1, verify_col2 = st.columns([1,3]) | |
| with verify_col1: | |
| if st.button("Verify Code"): | |
| if totp.verify(code): | |
| if enable_2fa_for_user(st.session_state.user['id'], st.session_state.totp_secret): | |
| st.success("2FA enabled successfully!") | |
| del st.session_state.totp_secret | |
| del st.session_state.setup_2fa | |
| st.rerun() | |
| else: | |
| st.error("Invalid code") | |
| with verify_col2: | |
| if st.button("Cancel"): | |
| del st.session_state.totp_secret | |
| del st.session_state.setup_2fa | |
| st.rerun() | |
| def check_account_activity(platform, username): | |
| """Check social media account activity.""" | |
| if platform == 'twitter': | |
| url = f"https://nitter.net/{username}/rss" | |
| # Parse RSS feed for latest activity | |
| pass | |
| elif platform == 'facebook': | |
| url = f"https://www.facebook.com/{username}" | |
| # Implementation needed | |
| pass | |
| def remove_heir(heir_id: str) -> bool: | |
| """Remove an heir and their associated accounts.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| # First check if there are any accounts associated with this heir | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM social_accounts | |
| WHERE heir_id = ? AND user_id = ? | |
| """, (heir_id, st.session_state.user['id'])) | |
| account_count = cursor.fetchone()[0] | |
| if account_count > 0: | |
| st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.") | |
| return False | |
| # If no accounts are associated, remove the heir | |
| cursor.execute(""" | |
| DELETE FROM heirs | |
| WHERE id = ? AND user_id = ? | |
| """, (heir_id, st.session_state.user['id'])) | |
| st.session_state.db.commit() | |
| st.success("Heir removed successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to remove heir: {str(e)}") | |
| return False | |
| def check_account_activity(platform, username): | |
| if platform == 'twitter': | |
| url = f"https://nitter.net/{username}/rss" | |
| # Parse RSS feed for latest activity | |
| elif platform == 'facebook': | |
| url = f"https://www.facebook.com/{username}" | |
| def remove_heir(heir_id: str) -> bool: | |
| """Remove an heir and their associated accounts.""" | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| # First check if there are any accounts associated with this heir | |
| cursor.execute(""" | |
| SELECT COUNT(*) FROM social_accounts | |
| WHERE heir_id = ? AND user_id = ? | |
| """, (heir_id, st.session_state.user['id'])) | |
| account_count = cursor.fetchone()[0] | |
| if account_count > 0: | |
| st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.") | |
| return False | |
| # If no accounts are associated, remove the heir | |
| cursor.execute(""" | |
| DELETE FROM heirs | |
| WHERE id = ? AND user_id = ? | |
| """, (heir_id, st.session_state.user['id'])) | |
| st.session_state.db.commit() | |
| st.success("Heir removed successfully!") | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to remove heir: {str(e)}") | |
| return False | |
| def initialize_database(): | |
| """ | |
| Initialize database tables needed for the application | |
| """ | |
| try: | |
| cursor = st.session_state.db.cursor() | |
| # Create login_attempt_history table instead of login_attempts | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS login_attempt_history ( | |
| id TEXT PRIMARY KEY, | |
| username TEXT NOT NULL, | |
| attempt_time TEXT NOT NULL, | |
| ip_address TEXT, | |
| success BOOLEAN DEFAULT FALSE | |
| ) | |
| """) | |
| # Create other necessary tables if they don't exist | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS social_accounts ( | |
| id TEXT PRIMARY KEY, | |
| user_id TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| username TEXT NOT NULL, | |
| status TEXT DEFAULT 'active', | |
| last_activity TEXT, | |
| heir_id TEXT, | |
| FOREIGN KEY (user_id) REFERENCES users (id), | |
| FOREIGN KEY (heir_id) REFERENCES heirs (id) | |
| ) | |
| """) | |
| st.session_state.db.commit() | |
| except Exception as e: | |
| logger.error(f"Database initialization failed: {str(e)}") | |
| st.error("Failed to initialize database. Please contact support.") | |
| def initialize_services(): | |
| """Initialize required services.""" | |
| if hasattr(st.session_state, 'db'): | |
| audit_logger.initialize_with_connection(st.session_state.db) | |
| # Main initialization function | |
| def initialize_app(): | |
| """Initialize the application.""" | |
| st.set_page_config( | |
| page_title="DIGITAL INHERITANCE SYSTEM", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| if 'db' in st.session_state: | |
| initialize_database() | |
| if 'page' not in st.session_state: | |
| st.session_state.page = 'login' | |
| initialize_services() |