File size: 3,807 Bytes
90e28c8 420f2a2 846543a a8317c2 10f8fa9 87168f1 73ab5b2 f06079d 597f2fc 0e4af71 73ab5b2 0e4af71 597f2fc 420f2a2 7c9ef15 597f2fc 7c9ef15 597f2fc 7c9ef15 828d76a 50e8588 0bbc3b6 50e8588 828d76a 3ce2714 73ab5b2 1fcc6f4 4f9d6d3 7e59b23 73ab5b2 9034fa5 420f2a2 73ab5b2 7e59b23 73ab5b2 7e59b23 73ab5b2 4f9d6d3 420f2a2 73ab5b2 4f9d6d3 73ab5b2 274371f 420f2a2 73ab5b2 fe63148 73ab5b2 420f2a2 73ab5b2 420f2a2 73ab5b2 420f2a2 73ab5b2 420f2a2 73ab5b2 87168f1 420f2a2 87168f1 c96aa7a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
#auth.py
from fastapi import Depends, HTTPException, Form, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database import get_db, get_user_by_email # Import database functions
from models import User
import jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
class AuthViews:
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.SECRET_KEY = "your-secret-key" # Replace with your actual secret key
self.ALGORITHM = "HS256"
self.ACCESS_TOKEN_EXPIRE_MINUTES = 30
auth_views = AuthViews()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_token(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, auth_views.SECRET_KEY, algorithms=[auth_views.ALGORITHM])
return payload.get("sub")
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
class UserCreate(BaseModel):
username: str
email: str
password: str
confirm_password: str
def authenticate_user(db: Session, email: str, password: str):
# Check if the user with the provided email exists in the database
user = get_user_by_email(db, email)
if user is None:
return None # User not found
# Check if the provided password is correct (You should verify the password)
if not pwd_context.verify(password, user.hashed_password):
return None # Incorrect password
return user # Authentication succeeded
from emailx import send_verification_email, generate_verification_token
def register(user: UserCreate, db: Session):
# Validate email format and check for existing users
db_user = get_user_by_email(db, user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash the password
hashed_password = pwd_context.hash(user.password)
# Generate a verification token
verification_token = generate_verification_token(user.email)
# Send a verification email
send_verification_email(user.email, verification_token)
# Create the user in the database
user_in_db = User(email=user.email, hashed_password=hashed_password)
db.add(user_in_db)
db.commit()
db.refresh(user_in_db)
return user_in_db
def verify_email(self, verification_token: str, db: Session = Depends(get_db)):
# Verify the email using the token (implement email.verify_token)
email = email.verify_token(verification_token)
if not email:
raise HTTPException(status_code=400, detail="Invalid verification token")
# Get the user by email
user = database.get_user_by_email(db, email)
if not user:
raise HTTPException(status_code=400, detail="User not found")
if user.is_verified:
raise HTTPException(status_code=400, detail="Email already verified")
# Mark the email as verified
user.is_verified = True
db.commit()
return {"message": "Email verification successful"}
def get_current_user(token: str = Depends(verify_token)):
if not token:
raise HTTPException(status_code=401, detail="Token not valid")
return token
def create_access_token(self, data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, AuthViews().SECRET_KEY, algorithm=AuthViews().ALGORITHM)
return encoded_jwt
|