TaskFlow / src /api /auth.py
BilalCode's picture
taskflow todo app
310260a
Raw
History Blame Contribute Delete
3.52 kB
from fastapi import APIRouter, HTTPException, status, Depends
from pydantic import BaseModel, EmailStr
from sqlmodel import Session, select
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from ..models.user import User
from ..database import get_session
from ..config import settings
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class SignupRequest(BaseModel):
email: EmailStr
password: str
class SigninRequest(BaseModel):
email: EmailStr
password: str
class AuthResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(user_id: int, email: str) -> str:
"""Create a JWT access token."""
expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRATION_HOURS)
to_encode = {
"sub": str(user_id),
"email": email,
"exp": expire
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
@router.post("/signup", response_model=AuthResponse)
async def signup(
request: SignupRequest,
session: Session = Depends(get_session)
):
"""
Register a new user.
- Validates email uniqueness
- Hashes password
- Creates user in database
- Returns JWT token
"""
# Check if user already exists
existing_user = session.exec(
select(User).where(User.email == request.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = hash_password(request.password)
new_user = User(
email=request.email,
password_hash=hashed_password
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create access token
access_token = create_access_token(new_user.id, new_user.email)
return AuthResponse(
access_token=access_token,
user={
"id": new_user.id,
"email": new_user.email
}
)
@router.post("/signin", response_model=AuthResponse)
async def signin(
request: SigninRequest,
session: Session = Depends(get_session)
):
"""
Authenticate a user.
- Validates credentials
- Returns JWT token
"""
# Find user by email
user = session.exec(
select(User).where(User.email == request.email)
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Verify password
if not verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Create access token
access_token = create_access_token(user.id, user.email)
return AuthResponse(
access_token=access_token,
user={
"id": user.id,
"email": user.email
}
)