iLOVE2D's picture
Upload 2846 files
5374a2d verified
"""
Security components for authentication and authorization.
"""
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any # , List
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, ValidationError
from pymongo.errors import DuplicateKeyError
from bson import ObjectId
from evoagentx.app.config import settings
from evoagentx.app.db import Database
from evoagentx.app.schemas import TokenPayload, UserCreate, UserResponse
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_PREFIX}/auth/login")
# User model for database
class UserInDB(BaseModel):
_id: Optional[ObjectId] = None
email: str
hashed_password: str
full_name: Optional[str] = None
is_active: bool = True
is_admin: bool = False
created_at: datetime = datetime.utcnow()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password for storing."""
return pwd_context.hash(password)
async def get_user_by_email(email: str) -> Optional[Dict[str, Any]]:
"""Get a user by email."""
return await Database.db.users.find_one({"email": email})
async def authenticate_user(email: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate a user by email and password."""
user = await get_user_by_email(email)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
if not user.get("is_active", True):
return None
return user
async def create_user(user_create: UserCreate) -> UserResponse:
"""Create a new user."""
# Check if user already exists
existing_user = await get_user_by_email(user_create.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
user_dict = user_create.dict()
hashed_password = get_password_hash(user_dict.pop("password"))
new_user = {
"email": user_dict["email"],
"hashed_password": hashed_password,
"full_name": user_dict.get("full_name"),
"is_active": True,
"is_admin": False,
"created_at": datetime.utcnow()
}
try:
insert_result = await Database.db.users.insert_one(new_user)
new_user["_id"] = insert_result.inserted_id
return UserResponse(**new_user)
except DuplicateKeyError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
def create_access_token(subject: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create a new JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": subject}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
"""Get the current user from a JWT token."""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
except (jwt.PyJWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_user_by_email(token_data.sub)
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
async def get_current_active_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Get the current active user."""
if not current_user.get("is_active", True):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_admin_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Get the current admin user."""
if not current_user.get("is_admin", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
# Initialize the users collection
async def init_users_collection():
"""Initialize the users collection with indexes."""
await Database.db.users.create_index("email", unique=True)
# Create admin user if it doesn't exist
admin_email = "admin@clayx.ai"
admin = await get_user_by_email(admin_email)
if not admin:
admin_user = UserCreate(
email=admin_email,
password="adminpassword", # Change this in production!
full_name="Admin User"
)
user_dict = admin_user.dict()
hashed_password = get_password_hash(user_dict["password"])
new_admin = {
"email": admin_email,
"hashed_password": hashed_password,
"full_name": "Admin User",
"is_active": True,
"is_admin": True,
"created_at": datetime.utcnow()
}
await Database.db.users.insert_one(new_admin)