# auth.py """ Authentication logic: signup, login, token management """ from flask import jsonify from database import get_supabase from crypto_utils import ( hash_password, verify_password, hash_username_for_storage, generate_encryption_key ) import jwt from datetime import datetime, timedelta import os # JWT Configuration JWT_SECRET = os.getenv("JWT_SECRET", "change-this-secret-in-production") JWT_ALGORITHM = "HS256" JWT_EXPIRATION_DAYS = 7 def create_jwt_token(user_id: str, username_hash: str) -> str: """ Create JWT token for authenticated user Args: user_id: User UUID from database username_hash: Hashed username Returns: JWT token string """ expiration = datetime.utcnow() + timedelta(days=JWT_EXPIRATION_DAYS) payload = { "user_id": user_id, "username_hash": username_hash, "exp": expiration, "iat": datetime.utcnow() } token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) return token def verify_jwt_token(token: str) -> dict: """ Verify and decode JWT token Args: token: JWT token string Returns: Decoded payload dict or None if invalid """ try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return payload except jwt.ExpiredSignatureError: print("Token expired") return None except jwt.InvalidTokenError: print("Invalid token") return None def signup_user(username: str, password: str): """ Register new user Args: username: Plain text username password: Plain text password Returns: Tuple of (result_dict, status_code) """ supabase = get_supabase() # Validate input if not username or not password: return {"error": "Username and password required"}, 400 if len(username) < 3 or len(username) > 20: return {"error": "Username must be 3-20 characters"}, 400 if not username.isalnum(): return {"error": "Username must be alphanumeric"}, 400 if len(password) < 6: return {"error": "Password must be at least 6 characters"}, 400 # Create username hash for storage username_hash = hash_username_for_storage(username) # Check if username exists try: result = supabase.table('users').select('*').eq('username_hash', username_hash).execute() if result.data: return {"error": "Username already exists"}, 400 except Exception as e: return {"error": f"Database error: {str(e)}"}, 500 # Hash password password_hash = hash_password(password) # Generate encryption key encryption_key = generate_encryption_key() # Insert user try: user_data = { 'username_hash': username_hash, 'password_hash': password_hash, 'encryption_key': encryption_key, 'created_at': datetime.utcnow().isoformat(), 'is_active': True } result = supabase.table('users').insert(user_data).execute() if not result.data: return {"error": "Failed to create user"}, 500 user = result.data[0] # Create JWT token token = create_jwt_token(user['id'], username_hash) return { "user_id": user['id'], "username": username, "username_hash": username_hash, "encryption_key": encryption_key, "token": token }, 200 except Exception as e: return {"error": f"Failed to create user: {str(e)}"}, 500 def login_user(username: str, password: str): """ Login existing user Args: username: Plain text username password: Plain text password Returns: Tuple of (result_dict, status_code) """ supabase = get_supabase() # Create username hash username_hash = hash_username_for_storage(username) # Get user from database try: result = supabase.table('users').select('*').eq('username_hash', username_hash).execute() if not result.data: return {"error": "Invalid username or password"}, 401 user = result.data[0] # Check if account is active if not user.get('is_active', True): return {"error": "Account is disabled"}, 403 # Verify password if not verify_password(password, user['password_hash']): return {"error": "Invalid username or password"}, 401 # Update last login supabase.table('users').update({ 'last_login': datetime.utcnow().isoformat() }).eq('id', user['id']).execute() # Create JWT token token = create_jwt_token(user['id'], username_hash) return { "user_id": user['id'], "username": username, "username_hash": username_hash, "encryption_key": user['encryption_key'], "token": token }, 200 except Exception as e: return {"error": f"Login failed: {str(e)}"}, 500 def check_username_exists(username: str): """ Check if username exists (for adding friends) Args: username: Plain text username Returns: Tuple of (result_dict, status_code) """ supabase = get_supabase() username_hash = hash_username_for_storage(username) try: result = supabase.table('users').select('id').eq('username_hash', username_hash).execute() exists = len(result.data) > 0 return { "exists": exists, "username": username if exists else None }, 200 except Exception as e: return {"error": f"Check failed: {str(e)}"}, 500 def verify_token(token: str): """ Verify JWT token and return user info Args: token: JWT token string (may include "Bearer " prefix) Returns: User payload dict or None """ if not token: return None # Remove "Bearer " prefix if present token = token.replace("Bearer ", "") payload = verify_jwt_token(token) if not payload: return None return payload