|
|
"""
|
|
|
User authentication models and validation for OpenManus
|
|
|
Mobile number + password based authentication system
|
|
|
"""
|
|
|
|
|
|
import hashlib
|
|
|
import re
|
|
|
import secrets
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Optional
|
|
|
from dataclasses import dataclass
|
|
|
from pydantic import BaseModel, validator
|
|
|
|
|
|
|
|
|
class UserSignupRequest(BaseModel):
|
|
|
"""User signup request model"""
|
|
|
|
|
|
full_name: str
|
|
|
mobile_number: str
|
|
|
password: str
|
|
|
confirm_password: str
|
|
|
|
|
|
@validator("full_name")
|
|
|
def validate_full_name(cls, v):
|
|
|
if not v or len(v.strip()) < 2:
|
|
|
raise ValueError("Full name must be at least 2 characters long")
|
|
|
if len(v.strip()) > 100:
|
|
|
raise ValueError("Full name must be less than 100 characters")
|
|
|
return v.strip()
|
|
|
|
|
|
@validator("mobile_number")
|
|
|
def validate_mobile_number(cls, v):
|
|
|
|
|
|
digits_only = re.sub(r"\D", "", v)
|
|
|
|
|
|
|
|
|
if len(digits_only) < 10 or len(digits_only) > 15:
|
|
|
raise ValueError("Mobile number must be between 10-15 digits")
|
|
|
|
|
|
|
|
|
if not re.match(r"^(\+?[1-9]\d{9,14})$", digits_only):
|
|
|
raise ValueError("Invalid mobile number format")
|
|
|
|
|
|
return digits_only
|
|
|
|
|
|
@validator("password")
|
|
|
def validate_password(cls, v):
|
|
|
if len(v) < 8:
|
|
|
raise ValueError("Password must be at least 8 characters long")
|
|
|
if len(v) > 128:
|
|
|
raise ValueError("Password must be less than 128 characters")
|
|
|
|
|
|
|
|
|
if not re.search(r"[A-Z]", v):
|
|
|
raise ValueError("Password must contain at least one uppercase letter")
|
|
|
if not re.search(r"[a-z]", v):
|
|
|
raise ValueError("Password must contain at least one lowercase letter")
|
|
|
if not re.search(r"\d", v):
|
|
|
raise ValueError("Password must contain at least one digit")
|
|
|
|
|
|
return v
|
|
|
|
|
|
@validator("confirm_password")
|
|
|
def validate_confirm_password(cls, v, values):
|
|
|
if "password" in values and v != values["password"]:
|
|
|
raise ValueError("Passwords do not match")
|
|
|
return v
|
|
|
|
|
|
|
|
|
class UserLoginRequest(BaseModel):
|
|
|
"""User login request model"""
|
|
|
|
|
|
mobile_number: str
|
|
|
password: str
|
|
|
|
|
|
@validator("mobile_number")
|
|
|
def validate_mobile_number(cls, v):
|
|
|
|
|
|
digits_only = re.sub(r"\D", "", v)
|
|
|
|
|
|
if len(digits_only) < 10 or len(digits_only) > 15:
|
|
|
raise ValueError("Invalid mobile number")
|
|
|
|
|
|
return digits_only
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class User:
|
|
|
"""User model"""
|
|
|
|
|
|
id: str
|
|
|
mobile_number: str
|
|
|
full_name: str
|
|
|
password_hash: str
|
|
|
avatar_url: Optional[str] = None
|
|
|
preferences: Optional[str] = None
|
|
|
is_active: bool = True
|
|
|
created_at: Optional[datetime] = None
|
|
|
updated_at: Optional[datetime] = None
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class UserSession:
|
|
|
"""User session model"""
|
|
|
|
|
|
session_id: str
|
|
|
user_id: str
|
|
|
mobile_number: str
|
|
|
full_name: str
|
|
|
created_at: datetime
|
|
|
expires_at: datetime
|
|
|
|
|
|
@property
|
|
|
def is_valid(self) -> bool:
|
|
|
"""Check if session is still valid"""
|
|
|
return datetime.utcnow() < self.expires_at
|
|
|
|
|
|
|
|
|
class UserAuth:
|
|
|
"""User authentication utilities"""
|
|
|
|
|
|
@staticmethod
|
|
|
def hash_password(password: str) -> str:
|
|
|
"""Hash password using SHA-256 with salt"""
|
|
|
salt = secrets.token_hex(32)
|
|
|
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
|
|
|
return f"{salt}:{password_hash}"
|
|
|
|
|
|
@staticmethod
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
|
"""Verify password against stored hash"""
|
|
|
try:
|
|
|
salt, stored_hash = password_hash.split(":")
|
|
|
password_hash_check = hashlib.sha256((password + salt).encode()).hexdigest()
|
|
|
return password_hash_check == stored_hash
|
|
|
except ValueError:
|
|
|
return False
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_session_id() -> str:
|
|
|
"""Generate secure session ID"""
|
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_user_id() -> str:
|
|
|
"""Generate unique user ID"""
|
|
|
return f"user_{secrets.token_hex(16)}"
|
|
|
|
|
|
@staticmethod
|
|
|
def format_mobile_number(mobile_number: str) -> str:
|
|
|
"""Format mobile number for consistent storage"""
|
|
|
|
|
|
digits_only = re.sub(r"\D", "", mobile_number)
|
|
|
|
|
|
|
|
|
if not digits_only.startswith("+"):
|
|
|
|
|
|
if len(digits_only) == 10:
|
|
|
digits_only = f"1{digits_only}"
|
|
|
|
|
|
return f"+{digits_only}"
|
|
|
|
|
|
@staticmethod
|
|
|
def create_session(user: User, duration_hours: int = 24) -> UserSession:
|
|
|
"""Create a new user session"""
|
|
|
session_id = UserAuth.generate_session_id()
|
|
|
created_at = datetime.utcnow()
|
|
|
expires_at = created_at + timedelta(hours=duration_hours)
|
|
|
|
|
|
return UserSession(
|
|
|
session_id=session_id,
|
|
|
user_id=user.id,
|
|
|
mobile_number=user.mobile_number,
|
|
|
full_name=user.full_name,
|
|
|
created_at=created_at,
|
|
|
expires_at=expires_at,
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class AuthResponse(BaseModel):
|
|
|
"""Authentication response model"""
|
|
|
|
|
|
success: bool
|
|
|
message: str
|
|
|
session_id: Optional[str] = None
|
|
|
user_id: Optional[str] = None
|
|
|
full_name: Optional[str] = None
|
|
|
|
|
|
|
|
|
class UserProfile(BaseModel):
|
|
|
"""User profile response model"""
|
|
|
|
|
|
user_id: str
|
|
|
full_name: str
|
|
|
mobile_number: str
|
|
|
avatar_url: Optional[str] = None
|
|
|
created_at: Optional[str] = None
|
|
|
|
|
|
@staticmethod
|
|
|
def mask_mobile_number(mobile_number: str) -> str:
|
|
|
"""Mask mobile number for security (show only last 4 digits)"""
|
|
|
if len(mobile_number) <= 4:
|
|
|
return "*" * len(mobile_number)
|
|
|
return "*" * (len(mobile_number) - 4) + mobile_number[-4:]
|
|
|
|