from fastapi import APIRouter, HTTPException, status, Depends from pydantic import BaseModel, EmailStr from sqlmodel import Session, select from passlib.context import CryptContext from jose import jwt from datetime import datetime, timedelta from ..models.user import User from ..database import get_session from ..config import settings router = APIRouter(prefix="/api/auth", tags=["Authentication"]) # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class SignupRequest(BaseModel): email: EmailStr password: str class SigninRequest(BaseModel): email: EmailStr password: str class AuthResponse(BaseModel): access_token: str token_type: str = "bearer" user: dict def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(user_id: int, email: str) -> str: """Create a JWT access token.""" expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRATION_HOURS) to_encode = { "sub": str(user_id), "email": email, "exp": expire } encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) return encoded_jwt @router.post("/signup", response_model=AuthResponse) async def signup( request: SignupRequest, session: Session = Depends(get_session) ): """ Register a new user. - Validates email uniqueness - Hashes password - Creates user in database - Returns JWT token """ # Check if user already exists existing_user = session.exec( select(User).where(User.email == request.email) ).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new user hashed_password = hash_password(request.password) new_user = User( email=request.email, password_hash=hashed_password ) session.add(new_user) session.commit() session.refresh(new_user) # Create access token access_token = create_access_token(new_user.id, new_user.email) return AuthResponse( access_token=access_token, user={ "id": new_user.id, "email": new_user.email } ) @router.post("/signin", response_model=AuthResponse) async def signin( request: SigninRequest, session: Session = Depends(get_session) ): """ Authenticate a user. - Validates credentials - Returns JWT token """ # Find user by email user = session.exec( select(User).where(User.email == request.email) ).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" ) # Verify password if not verify_password(request.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" ) # Create access token access_token = create_access_token(user.id, user.email) return AuthResponse( access_token=access_token, user={ "id": user.id, "email": user.email } )