#auth.py from fastapi import Depends, HTTPException, Form, status from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel from sqlalchemy.orm import Session from database import get_db, get_user_by_email # Import database functions from models import User, VerificationToken import jwt from passlib.context import CryptContext from datetime import datetime, timedelta import os yoursecretkey = os.environ['my_secret_key'] class AuthViews: def __init__(self): self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") self.SECRET_KEY = yoursecretkey # Replace with your actual secret key self.ALGORITHM = "HS256" self.ACCESS_TOKEN_EXPIRE_MINUTES = 30 def create_access_token(self, data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, self.SECRET_KEY, algorithm=self.ALGORITHM) return encoded_jwt auth_views = AuthViews() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") def verify_token(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, auth_views.SECRET_KEY, algorithms=[auth_views.ALGORITHM]) return payload.get("sub") except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token has expired") except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) class UserCreate(BaseModel): username: str email: str password: str confirm_password: str class TokenData(BaseModel): token: str # Use auth_views.pwd_context to access the password context from the AuthViews instance def authenticate_user(db: Session, email: str, password: str): user = get_user_by_email(db, email) if user is None: raise HTTPException(status_code=400, detail="User not found") # Use the pwd_context from the auth_views instance if not auth_views.pwd_context.verify(password, user.hashed_password): raise HTTPException(status_code=400, detail="Incorrect password") return user from emailx import send_verification_email, generate_verification_token def get_user_by_verification_token(db: Session, verification_token: str): return db.query(User).filter(User.email_verification_token == verification_token).first() def verify_email(verification_token: str, db: Session = Depends(get_db)): # Verify the email using the token user = get_user_by_verification_token(db, verification_token) if not user: raise HTTPException(status_code=400, detail="Invalid verification token") if user.is_verified: raise HTTPException(status_code=400, detail="Email already verified") # Mark the email as verified user.is_verified = True user.email_verification_token = None # Optionally clear the verification token db.commit() return {"message": "Email verification successful"} def register(user: UserCreate, db: Session): # Validate email format and check for existing users db_user = get_user_by_email(db, user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") # Hash the password hashed_password = auth_views.pwd_context.hash(user.password) # Generate a verification token verification_token = generate_verification_token(user.email) reset_link = f"https://gregniuki-loginauth.hf.space/verify?token={verification_token}" # Send a verification email send_verification_email(user.email, reset_link) # Verify the email # verify_email(verification_token, db) # Create the user in the database # Set the email_verification_token field in the User model user_in_db = User( email=user.email, username=user.username, # Set the username here hashed_password=hashed_password, email_verification_token=verification_token ) db.add(user_in_db) db.commit() db.refresh(user_in_db) return user_in_db def resetpassword(user: User, db: Session): # Generate a verification token verification_token = generate_verification_token(user.email) # Send a verification email with reset linkhttps://gregniuki-loginauth.hf.space/verify/{verification_token} reset_link = f"https://gregniuki-loginauth.hf.space/reset-password?token={verification_token}" send_verification_email(user.email, reset_link) # Update the user's email verification token user.email_verification_token = verification_token db.commit() def get_current_user(token: str = Depends(verify_token)): if not token: raise HTTPException(status_code=401, detail="Token not valid") return token