HINTECH / routes /auth.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
Authentication Routes
Flask routes for user registration, login, password reset, and account management
"""
from flask import Blueprint, request, jsonify, current_app
from flask_cors import cross_origin
from models.user import db
from models.enhanced_user import User, VPNClient
from functools import wraps
import logging
import re
import secrets
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
auth_bp = Blueprint('auth', __name__)
def token_required(f):
"""Decorator to require valid JWT token"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# Get token from Authorization header
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(" ")[1] # Bearer <token>
except IndexError:
return jsonify({'error': 'Invalid token format'}), 401
if not token:
return jsonify({'error': 'Token is missing'}), 401
try:
current_user = User.verify_auth_token(token)
if current_user is None:
return jsonify({'error': 'Token is invalid or expired'}), 401
if not current_user.is_active:
return jsonify({'error': 'Account is deactivated'}), 401
except Exception as e:
logger.error(f"Token verification error: {e}")
return jsonify({'error': 'Token verification failed'}), 401
return f(current_user, *args, **kwargs)
return decorated
def admin_required(f):
"""Decorator to require admin privileges"""
@wraps(f)
def decorated(current_user, *args, **kwargs):
if not current_user.is_admin:
return jsonify({'error': 'Admin privileges required'}), 403
return f(current_user, *args, **kwargs)
return decorated
@auth_bp.route('/register', methods=['POST'])
@cross_origin()
def register():
"""User registration endpoint"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validate required fields
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'error': f'{field} is required'}), 400
username = data['username'].strip()
email = data['email'].strip().lower()
password = data['password']
# Validate input format
if not User.validate_username(username):
return jsonify({
'error': 'Username must be 3-80 characters and contain only letters, numbers, hyphens, and underscores'
}), 400
if not User.validate_email(email):
return jsonify({'error': 'Invalid email format'}), 400
if not User.validate_password_strength(password):
return jsonify({
'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character'
}), 400
# Check if user already exists
if User.query.filter_by(username=username).first():
return jsonify({'error': 'Username already exists'}), 409
if User.query.filter_by(email=email).first():
return jsonify({'error': 'Email already registered'}), 409
# Create new user
user = User(username=username, email=email, password=password)
# Set subscription based on registration data
subscription_type = data.get('subscription_type', 'free')
if subscription_type in ['free', 'premium', 'enterprise']:
user.subscription_type = subscription_type
# Set limits based on subscription
if subscription_type == 'premium':
user.max_concurrent_connections = 3
user.bandwidth_limit_mbps = 50
user.subscription_expires = datetime.utcnow() + timedelta(days=30)
elif subscription_type == 'enterprise':
user.max_concurrent_connections = 10
user.bandwidth_limit_mbps = 100
user.subscription_expires = datetime.utcnow() + timedelta(days=30)
db.session.add(user)
db.session.commit()
logger.info(f"New user registered: {username} ({email})")
# Generate tokens
auth_token = user.generate_auth_token()
refresh_token = user.generate_refresh_token()
return jsonify({
'message': 'User registered successfully',
'user': user.to_dict(),
'auth_token': auth_token,
'refresh_token': refresh_token,
'email_verification_required': not user.email_verified
}), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f"Registration error: {e}")
db.session.rollback()
return jsonify({'error': 'Registration failed'}), 500
@auth_bp.route('/login', methods=['POST'])
@cross_origin()
def login():
"""User login endpoint"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validate required fields
if 'login' not in data or 'password' not in data:
return jsonify({'error': 'Login and password are required'}), 400
login_field = data['login'].strip()
password = data['password']
# Find user by username or email
user = None
if '@' in login_field:
user = User.query.filter_by(email=login_field.lower()).first()
else:
user = User.query.filter_by(username=login_field).first()
if not user:
return jsonify({'error': 'Invalid credentials'}), 401
if not user.is_active:
return jsonify({'error': 'Account is deactivated'}), 401
if user.is_account_locked():
return jsonify({
'error': 'Account is temporarily locked due to failed login attempts'
}), 423
if not user.check_password(password):
db.session.commit() # Save failed attempt count
return jsonify({'error': 'Invalid credentials'}), 401
# Update last login
user.last_login = datetime.utcnow()
db.session.commit()
logger.info(f"User logged in: {user.username}")
# Generate tokens
auth_token = user.generate_auth_token()
refresh_token = user.generate_refresh_token()
return jsonify({
'message': 'Login successful',
'user': user.to_dict(include_sensitive='self'),
'auth_token': auth_token,
'refresh_token': refresh_token
}), 200
except Exception as e:
logger.error(f"Login error: {e}")
return jsonify({'error': 'Login failed'}), 500
@auth_bp.route('/refresh', methods=['POST'])
@cross_origin()
def refresh_token():
"""Refresh authentication token"""
try:
data = request.get_json()
if not data or 'refresh_token' not in data:
return jsonify({'error': 'Refresh token is required'}), 400
refresh_token = data['refresh_token']
user = User.verify_refresh_token(refresh_token)
if not user:
return jsonify({'error': 'Invalid or expired refresh token'}), 401
if not user.is_active:
return jsonify({'error': 'Account is deactivated'}), 401
# Generate new tokens
new_auth_token = user.generate_auth_token()
new_refresh_token = user.generate_refresh_token()
return jsonify({
'auth_token': new_auth_token,
'refresh_token': new_refresh_token,
'user': user.to_dict()
}), 200
except Exception as e:
logger.error(f"Token refresh error: {e}")
return jsonify({'error': 'Token refresh failed'}), 500
@auth_bp.route('/logout', methods=['POST'])
@cross_origin()
@token_required
def logout(current_user):
"""User logout endpoint"""
try:
# In a production system, you would invalidate the token
# For now, we just return success
logger.info(f"User logged out: {current_user.username}")
return jsonify({'message': 'Logout successful'}), 200
except Exception as e:
logger.error(f"Logout error: {e}")
return jsonify({'error': 'Logout failed'}), 500
@auth_bp.route('/profile', methods=['GET'])
@cross_origin()
@token_required
def get_profile(current_user):
"""Get user profile"""
try:
return jsonify({
'user': current_user.to_dict(include_sensitive='self')
}), 200
except Exception as e:
logger.error(f"Profile retrieval error: {e}")
return jsonify({'error': 'Failed to retrieve profile'}), 500
@auth_bp.route('/profile', methods=['PUT'])
@cross_origin()
@token_required
def update_profile(current_user):
"""Update user profile"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Update allowed fields
if 'email' in data:
new_email = data['email'].strip().lower()
if new_email != current_user.email:
if not User.validate_email(new_email):
return jsonify({'error': 'Invalid email format'}), 400
# Check if email is already taken
existing_user = User.query.filter_by(email=new_email).first()
if existing_user and existing_user.id != current_user.id:
return jsonify({'error': 'Email already registered'}), 409
current_user.email = new_email
current_user.email_verified = False
current_user.email_verification_token = secrets.token_urlsafe(32)
db.session.commit()
logger.info(f"Profile updated for user: {current_user.username}")
return jsonify({
'message': 'Profile updated successfully',
'user': current_user.to_dict(include_sensitive='self')
}), 200
except Exception as e:
logger.error(f"Profile update error: {e}")
db.session.rollback()
return jsonify({'error': 'Profile update failed'}), 500
@auth_bp.route('/change-password', methods=['POST'])
@cross_origin()
@token_required
def change_password(current_user):
"""Change user password"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validate required fields
required_fields = ['current_password', 'new_password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
current_password = data['current_password']
new_password = data['new_password']
# Verify current password
if not current_user.check_password(current_password):
return jsonify({'error': 'Current password is incorrect'}), 401
# Validate new password
if not User.validate_password_strength(new_password):
return jsonify({
'error': 'New password must be at least 8 characters with uppercase, lowercase, number, and special character'
}), 400
# Set new password
current_user.set_password(new_password)
db.session.commit()
logger.info(f"Password changed for user: {current_user.username}")
return jsonify({'message': 'Password changed successfully'}), 200
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f"Password change error: {e}")
db.session.rollback()
return jsonify({'error': 'Password change failed'}), 500
@auth_bp.route('/forgot-password', methods=['POST'])
@cross_origin()
def forgot_password():
"""Request password reset"""
try:
data = request.get_json()
if not data or 'email' not in data:
return jsonify({'error': 'Email is required'}), 400
email = data['email'].strip().lower()
user = User.query.filter_by(email=email).first()
if user:
reset_token = user.generate_password_reset_token()
db.session.commit()
# In a production system, you would send an email here
logger.info(f"Password reset requested for user: {user.username}")
# For development, return the token (remove in production)
return jsonify({
'message': 'Password reset instructions sent to email',
'reset_token': reset_token # Remove this in production
}), 200
else:
# Don't reveal if email exists
return jsonify({
'message': 'If the email exists, password reset instructions have been sent'
}), 200
except Exception as e:
logger.error(f"Password reset request error: {e}")
return jsonify({'error': 'Password reset request failed'}), 500
@auth_bp.route('/reset-password', methods=['POST'])
@cross_origin()
def reset_password():
"""Reset password with token"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validate required fields
required_fields = ['email', 'token', 'new_password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
email = data['email'].strip().lower()
token = data['token']
new_password = data['new_password']
user = User.query.filter_by(email=email).first()
if not user:
return jsonify({'error': 'Invalid reset request'}), 400
if not User.validate_password_strength(new_password):
return jsonify({
'error': 'Password must be at least 8 characters with uppercase, lowercase, number, and special character'
}), 400
if user.reset_password(new_password, token):
db.session.commit()
logger.info(f"Password reset completed for user: {user.username}")
return jsonify({'message': 'Password reset successfully'}), 200
else:
return jsonify({'error': 'Invalid or expired reset token'}), 400
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f"Password reset error: {e}")
db.session.rollback()
return jsonify({'error': 'Password reset failed'}), 500
@auth_bp.route('/verify-email', methods=['POST'])
@cross_origin()
def verify_email():
"""Verify email address"""
try:
data = request.get_json()
if not data or 'token' not in data:
return jsonify({'error': 'Verification token is required'}), 400
token = data['token']
# Find user by verification token
user = User.query.filter_by(email_verification_token=token).first()
if not user:
return jsonify({'error': 'Invalid verification token'}), 400
if user.verify_email(token):
db.session.commit()
logger.info(f"Email verified for user: {user.username}")
return jsonify({'message': 'Email verified successfully'}), 200
else:
return jsonify({'error': 'Email verification failed'}), 400
except Exception as e:
logger.error(f"Email verification error: {e}")
return jsonify({'error': 'Email verification failed'}), 500
@auth_bp.route('/users', methods=['GET'])
@cross_origin()
@token_required
@admin_required
def list_users(current_user):
"""List all users (admin only)"""
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
search = request.args.get('search', '')
query = User.query
if search:
query = query.filter(
db.or_(
User.username.contains(search),
User.email.contains(search)
)
)
users = query.paginate(
page=page,
per_page=per_page,
error_out=False
)
return jsonify({
'users': [user.to_dict(include_sensitive=True) for user in users.items],
'total': users.total,
'pages': users.pages,
'current_page': page,
'per_page': per_page
}), 200
except Exception as e:
logger.error(f"User listing error: {e}")
return jsonify({'error': 'Failed to retrieve users'}), 500
@auth_bp.route('/users/<int:user_id>', methods=['GET'])
@cross_origin()
@token_required
@admin_required
def get_user(current_user, user_id):
"""Get specific user details (admin only)"""
try:
user = User.query.get_or_404(user_id)
return jsonify({
'user': user.to_dict(include_sensitive=True)
}), 200
except Exception as e:
logger.error(f"User retrieval error: {e}")
return jsonify({'error': 'Failed to retrieve user'}), 500
@auth_bp.route('/users/<int:user_id>/deactivate', methods=['POST'])
@cross_origin()
@token_required
@admin_required
def deactivate_user(current_user, user_id):
"""Deactivate user account (admin only)"""
try:
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
return jsonify({'error': 'Cannot deactivate your own account'}), 400
user.is_active = False
db.session.commit()
logger.info(f"User deactivated by admin {current_user.username}: {user.username}")
return jsonify({'message': 'User deactivated successfully'}), 200
except Exception as e:
logger.error(f"User deactivation error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to deactivate user'}), 500
@auth_bp.route('/users/<int:user_id>/activate', methods=['POST'])
@cross_origin()
@token_required
@admin_required
def activate_user(current_user, user_id):
"""Activate user account (admin only)"""
try:
user = User.query.get_or_404(user_id)
user.is_active = True
user.failed_login_attempts = 0
user.account_locked_until = None
db.session.commit()
logger.info(f"User activated by admin {current_user.username}: {user.username}")
return jsonify({'message': 'User activated successfully'}), 200
except Exception as e:
logger.error(f"User activation error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to activate user'}), 500