fuzik / app /routers /authentication.py
date3k2's picture
Add new files and models
d2726bc
raw
history blame
No virus
3.54 kB
from fastapi import APIRouter, Depends, HTTPException, status, Security, Body, Query
from fastapi.security import OAuth2PasswordRequestForm
from models.user import UserSignup
from db.supabase_service import get_supabase
from typing import Annotated
from supabase import Client
from utils.auth import get_id
from fastapi.encoders import jsonable_encoder
from models.enums import Role
from gotrue.errors import AuthApiError
from crud.user import update_user
from utils.exceptions import BAD_REQUEST, CONFLICT, NOT_FOUND
router = APIRouter(tags=["Authentication"])
@router.post("/login", description="Login user using email and password")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = form_data.username
password = form_data.password
try:
user = supabase.auth.sign_in_with_password(
{"email": email, "password": password}
)
return {
"access_token": user.session.access_token,
"token_type": "bearer",
}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
@router.post(
"/signup", description="Sign up new user", status_code=status.HTTP_201_CREATED
)
async def signup(
user: Annotated[UserSignup, Body()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = user.email
password = user.password
if len(password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password should be at least 6 characters",
)
try:
user.role = Role.user
res = supabase.auth.sign_up(
{
"email": email,
"password": password,
"options": {
"data": jsonable_encoder(user, exclude=("password", "email"))
},
}
)
return {"detail": "User created"}
except AuthApiError:
raise CONFLICT
except:
raise BAD_REQUEST
@router.post("/reset_password", description="Send reset password email")
async def reset_password(
supabase: Annotated[Client, Depends(get_supabase)],
email: str,
):
try:
if (
len(supabase.rpc("get_user_id_by_email", {"email": email}).execute().data)
> 0
):
supabase.auth.reset_password_email(email)
return {"detail": "Reset password email sent"}
else:
raise NOT_FOUND
except:
raise NOT_FOUND
@router.post("/reset_password_confirm", description="Reset password")
async def reset_password_confirm(
supabase: Annotated[Client, Depends(get_supabase)],
email: Annotated[str, Query(description="Email", title="Email")],
new_password: Annotated[
str, Query(min_length=6, description="New password", title="New password")
],
token: Annotated[
str, Query(description="Reset password token", title="Reset password token")
],
):
try:
supabase.auth.verify_otp(
{
"email": email,
"token": token,
"type": "email",
}
)
await update_user(supabase, password=new_password)
return {"detail": "Your password has been reset"}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect token",
)