Spaces:
Paused
Paused
| """ | |
| Authentication Routes | |
| Flask routes for user registration, login, password reset, and account management | |
| """ | |
| from flask import Blueprint, request, jsonify, current_app | |
| from flask_cors import cross_origin | |
| from models.user import db | |
| from models.enhanced_user import User, VPNClient | |
| from functools import wraps | |
| import logging | |
| import re | |
| import secrets | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| auth_bp = Blueprint('auth', __name__) | |
| def token_required(f): | |
| """Decorator to require valid JWT token""" | |
| def decorated(*args, **kwargs): | |
| token = None | |
| # Get token from Authorization header | |
| if 'Authorization' in request.headers: | |
| auth_header = request.headers['Authorization'] | |
| try: | |
| token = auth_header.split(" ")[1] # Bearer <token> | |
| except IndexError: | |
| return jsonify({'error': 'Invalid token format'}), 401 | |
| if not token: | |
| return jsonify({'error': 'Token is missing'}), 401 | |
| try: | |
| current_user = User.verify_auth_token(token) | |
| if current_user is None: | |
| return jsonify({'error': 'Token is invalid or expired'}), 401 | |
| if not current_user.is_active: | |
| return jsonify({'error': 'Account is deactivated'}), 401 | |
| except Exception as e: | |
| logger.error(f"Token verification error: {e}") | |
| return jsonify({'error': 'Token verification failed'}), 401 | |
| return f(current_user, *args, **kwargs) | |
| return decorated | |
| def admin_required(f): | |
| """Decorator to require admin privileges""" | |
| def decorated(current_user, *args, **kwargs): | |
| if not current_user.is_admin: | |
| return jsonify({'error': 'Admin privileges required'}), 403 | |
| return f(current_user, *args, **kwargs) | |
| return decorated | |
| def register(): | |
| """User registration endpoint""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Validate required fields | |
| required_fields = ['username', 'email', 'password'] | |
| for field in required_fields: | |
| if field not in data or not data[field]: | |
| return jsonify({'error': f'{field} is required'}), 400 | |
| username = data['username'].strip() | |
| email = data['email'].strip().lower() | |
| password = data['password'] | |
| # Validate input format | |
| if not User.validate_username(username): | |
| return jsonify({ | |
| 'error': 'Username must be 3-80 characters and contain only letters, numbers, hyphens, and underscores' | |
| }), 400 | |
| if not User.validate_email(email): | |
| return jsonify({'error': 'Invalid email format'}), 400 | |
| if not User.validate_password_strength(password): | |
| return jsonify({ | |
| 'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character' | |
| }), 400 | |
| # Check if user already exists | |
| if User.query.filter_by(username=username).first(): | |
| return jsonify({'error': 'Username already exists'}), 409 | |
| if User.query.filter_by(email=email).first(): | |
| return jsonify({'error': 'Email already registered'}), 409 | |
| # Create new user | |
| user = User(username=username, email=email, password=password) | |
| # Set subscription based on registration data | |
| subscription_type = data.get('subscription_type', 'free') | |
| if subscription_type in ['free', 'premium', 'enterprise']: | |
| user.subscription_type = subscription_type | |
| # Set limits based on subscription | |
| if subscription_type == 'premium': | |
| user.max_concurrent_connections = 3 | |
| user.bandwidth_limit_mbps = 50 | |
| user.subscription_expires = datetime.utcnow() + timedelta(days=30) | |
| elif subscription_type == 'enterprise': | |
| user.max_concurrent_connections = 10 | |
| user.bandwidth_limit_mbps = 100 | |
| user.subscription_expires = datetime.utcnow() + timedelta(days=30) | |
| db.session.add(user) | |
| db.session.commit() | |
| logger.info(f"New user registered: {username} ({email})") | |
| # Generate tokens | |
| auth_token = user.generate_auth_token() | |
| refresh_token = user.generate_refresh_token() | |
| return jsonify({ | |
| 'message': 'User registered successfully', | |
| 'user': user.to_dict(), | |
| 'auth_token': auth_token, | |
| 'refresh_token': refresh_token, | |
| 'email_verification_required': not user.email_verified | |
| }), 201 | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Registration error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Registration failed'}), 500 | |
| def login(): | |
| """User login endpoint""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Validate required fields | |
| if 'login' not in data or 'password' not in data: | |
| return jsonify({'error': 'Login and password are required'}), 400 | |
| login_field = data['login'].strip() | |
| password = data['password'] | |
| # Find user by username or email | |
| user = None | |
| if '@' in login_field: | |
| user = User.query.filter_by(email=login_field.lower()).first() | |
| else: | |
| user = User.query.filter_by(username=login_field).first() | |
| if not user: | |
| return jsonify({'error': 'Invalid credentials'}), 401 | |
| if not user.is_active: | |
| return jsonify({'error': 'Account is deactivated'}), 401 | |
| if user.is_account_locked(): | |
| return jsonify({ | |
| 'error': 'Account is temporarily locked due to failed login attempts' | |
| }), 423 | |
| if not user.check_password(password): | |
| db.session.commit() # Save failed attempt count | |
| return jsonify({'error': 'Invalid credentials'}), 401 | |
| # Update last login | |
| user.last_login = datetime.utcnow() | |
| db.session.commit() | |
| logger.info(f"User logged in: {user.username}") | |
| # Generate tokens | |
| auth_token = user.generate_auth_token() | |
| refresh_token = user.generate_refresh_token() | |
| return jsonify({ | |
| 'message': 'Login successful', | |
| 'user': user.to_dict(include_sensitive='self'), | |
| 'auth_token': auth_token, | |
| 'refresh_token': refresh_token | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Login error: {e}") | |
| return jsonify({'error': 'Login failed'}), 500 | |
| def refresh_token(): | |
| """Refresh authentication token""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'refresh_token' not in data: | |
| return jsonify({'error': 'Refresh token is required'}), 400 | |
| refresh_token = data['refresh_token'] | |
| user = User.verify_refresh_token(refresh_token) | |
| if not user: | |
| return jsonify({'error': 'Invalid or expired refresh token'}), 401 | |
| if not user.is_active: | |
| return jsonify({'error': 'Account is deactivated'}), 401 | |
| # Generate new tokens | |
| new_auth_token = user.generate_auth_token() | |
| new_refresh_token = user.generate_refresh_token() | |
| return jsonify({ | |
| 'auth_token': new_auth_token, | |
| 'refresh_token': new_refresh_token, | |
| 'user': user.to_dict() | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Token refresh error: {e}") | |
| return jsonify({'error': 'Token refresh failed'}), 500 | |
| def logout(current_user): | |
| """User logout endpoint""" | |
| try: | |
| # In a production system, you would invalidate the token | |
| # For now, we just return success | |
| logger.info(f"User logged out: {current_user.username}") | |
| return jsonify({'message': 'Logout successful'}), 200 | |
| except Exception as e: | |
| logger.error(f"Logout error: {e}") | |
| return jsonify({'error': 'Logout failed'}), 500 | |
| def get_profile(current_user): | |
| """Get user profile""" | |
| try: | |
| return jsonify({ | |
| 'user': current_user.to_dict(include_sensitive='self') | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Profile retrieval error: {e}") | |
| return jsonify({'error': 'Failed to retrieve profile'}), 500 | |
| def update_profile(current_user): | |
| """Update user profile""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Update allowed fields | |
| if 'email' in data: | |
| new_email = data['email'].strip().lower() | |
| if new_email != current_user.email: | |
| if not User.validate_email(new_email): | |
| return jsonify({'error': 'Invalid email format'}), 400 | |
| # Check if email is already taken | |
| existing_user = User.query.filter_by(email=new_email).first() | |
| if existing_user and existing_user.id != current_user.id: | |
| return jsonify({'error': 'Email already registered'}), 409 | |
| current_user.email = new_email | |
| current_user.email_verified = False | |
| current_user.email_verification_token = secrets.token_urlsafe(32) | |
| db.session.commit() | |
| logger.info(f"Profile updated for user: {current_user.username}") | |
| return jsonify({ | |
| 'message': 'Profile updated successfully', | |
| 'user': current_user.to_dict(include_sensitive='self') | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Profile update error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Profile update failed'}), 500 | |
| def change_password(current_user): | |
| """Change user password""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Validate required fields | |
| required_fields = ['current_password', 'new_password'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({'error': f'{field} is required'}), 400 | |
| current_password = data['current_password'] | |
| new_password = data['new_password'] | |
| # Verify current password | |
| if not current_user.check_password(current_password): | |
| return jsonify({'error': 'Current password is incorrect'}), 401 | |
| # Validate new password | |
| if not User.validate_password_strength(new_password): | |
| return jsonify({ | |
| 'error': 'New password must be at least 8 characters with uppercase, lowercase, number, and special character' | |
| }), 400 | |
| # Set new password | |
| current_user.set_password(new_password) | |
| db.session.commit() | |
| logger.info(f"Password changed for user: {current_user.username}") | |
| return jsonify({'message': 'Password changed successfully'}), 200 | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Password change error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Password change failed'}), 500 | |
| def forgot_password(): | |
| """Request password reset""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'email' not in data: | |
| return jsonify({'error': 'Email is required'}), 400 | |
| email = data['email'].strip().lower() | |
| user = User.query.filter_by(email=email).first() | |
| if user: | |
| reset_token = user.generate_password_reset_token() | |
| db.session.commit() | |
| # In a production system, you would send an email here | |
| logger.info(f"Password reset requested for user: {user.username}") | |
| # For development, return the token (remove in production) | |
| return jsonify({ | |
| 'message': 'Password reset instructions sent to email', | |
| 'reset_token': reset_token # Remove this in production | |
| }), 200 | |
| else: | |
| # Don't reveal if email exists | |
| return jsonify({ | |
| 'message': 'If the email exists, password reset instructions have been sent' | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Password reset request error: {e}") | |
| return jsonify({'error': 'Password reset request failed'}), 500 | |
| def reset_password(): | |
| """Reset password with token""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Validate required fields | |
| required_fields = ['email', 'token', 'new_password'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({'error': f'{field} is required'}), 400 | |
| email = data['email'].strip().lower() | |
| token = data['token'] | |
| new_password = data['new_password'] | |
| user = User.query.filter_by(email=email).first() | |
| if not user: | |
| return jsonify({'error': 'Invalid reset request'}), 400 | |
| if not User.validate_password_strength(new_password): | |
| return jsonify({ | |
| 'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character' | |
| }), 400 | |
| if user.reset_password(new_password, token): | |
| db.session.commit() | |
| logger.info(f"Password reset completed for user: {user.username}") | |
| return jsonify({'message': 'Password reset successfully'}), 200 | |
| else: | |
| return jsonify({'error': 'Invalid or expired reset token'}), 400 | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Password reset error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Password reset failed'}), 500 | |
| def verify_email(): | |
| """Verify email address""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'token' not in data: | |
| return jsonify({'error': 'Verification token is required'}), 400 | |
| token = data['token'] | |
| # Find user by verification token | |
| user = User.query.filter_by(email_verification_token=token).first() | |
| if not user: | |
| return jsonify({'error': 'Invalid verification token'}), 400 | |
| if user.verify_email(token): | |
| db.session.commit() | |
| logger.info(f"Email verified for user: {user.username}") | |
| return jsonify({'message': 'Email verified successfully'}), 200 | |
| else: | |
| return jsonify({'error': 'Email verification failed'}), 400 | |
| except Exception as e: | |
| logger.error(f"Email verification error: {e}") | |
| return jsonify({'error': 'Email verification failed'}), 500 | |
| def list_users(current_user): | |
| """List all users (admin only)""" | |
| try: | |
| page = request.args.get('page', 1, type=int) | |
| per_page = request.args.get('per_page', 20, type=int) | |
| search = request.args.get('search', '') | |
| query = User.query | |
| if search: | |
| query = query.filter( | |
| db.or_( | |
| User.username.contains(search), | |
| User.email.contains(search) | |
| ) | |
| ) | |
| users = query.paginate( | |
| page=page, | |
| per_page=per_page, | |
| error_out=False | |
| ) | |
| return jsonify({ | |
| 'users': [user.to_dict(include_sensitive=True) for user in users.items], | |
| 'total': users.total, | |
| 'pages': users.pages, | |
| 'current_page': page, | |
| 'per_page': per_page | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"User listing error: {e}") | |
| return jsonify({'error': 'Failed to retrieve users'}), 500 | |
| def get_user(current_user, user_id): | |
| """Get specific user details (admin only)""" | |
| try: | |
| user = User.query.get_or_404(user_id) | |
| return jsonify({ | |
| 'user': user.to_dict(include_sensitive=True) | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"User retrieval error: {e}") | |
| return jsonify({'error': 'Failed to retrieve user'}), 500 | |
| def deactivate_user(current_user, user_id): | |
| """Deactivate user account (admin only)""" | |
| try: | |
| user = User.query.get_or_404(user_id) | |
| if user.id == current_user.id: | |
| return jsonify({'error': 'Cannot deactivate your own account'}), 400 | |
| user.is_active = False | |
| db.session.commit() | |
| logger.info(f"User deactivated by admin {current_user.username}: {user.username}") | |
| return jsonify({'message': 'User deactivated successfully'}), 200 | |
| except Exception as e: | |
| logger.error(f"User deactivation error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to deactivate user'}), 500 | |
| def activate_user(current_user, user_id): | |
| """Activate user account (admin only)""" | |
| try: | |
| user = User.query.get_or_404(user_id) | |
| user.is_active = True | |
| user.failed_login_attempts = 0 | |
| user.account_locked_until = None | |
| db.session.commit() | |
| logger.info(f"User activated by admin {current_user.username}: {user.username}") | |
| return jsonify({'message': 'User activated successfully'}), 200 | |
| except Exception as e: | |
| logger.error(f"User activation error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to activate user'}), 500 | |