Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, Depends, HTTPException, status | |
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
from jose import JWTError, jwt | |
from passlib.context import CryptContext | |
from pydantic import BaseModel | |
from datetime import datetime, timedelta, timezone | |
import sqlite3 | |
import os | |
SECRET_KEY = "your-secret-key" | |
ALGORITHM = "HS256" | |
ACCESS_TOKEN_EXPIRE_MINUTES = 300 | |
pwd_context = CryptContext(schemes=["django_pbkdf2_sha256"], deprecated="auto") | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
auth_router = APIRouter() | |
class User(BaseModel): | |
username: str | |
class Token(BaseModel): | |
access_token: str | |
token_type: str | |
# SQLite setup | |
DB_NAME = "users.db" | |
def init_db(): | |
conn = sqlite3.connect(DB_NAME) | |
cur = conn.cursor() | |
cur.execute('''CREATE TABLE IF NOT EXISTS users | |
(username TEXT PRIMARY KEY, hashed_password TEXT)''') | |
# Add sample users if they don't exist | |
cur.execute("INSERT OR IGNORE INTO users VALUES (?, ?)", | |
("admin", pwd_context.hash("admin"))) | |
conn.commit() | |
conn.close() | |
# Initialize the database | |
init_db() | |
def verify_password(plain_password, hashed_password): | |
return pwd_context.verify(plain_password, hashed_password) | |
def get_user(username: str): | |
conn = sqlite3.connect(DB_NAME) | |
cur = conn.cursor() | |
cur.execute("SELECT * FROM users WHERE username=?", (username,)) | |
user = cur.fetchone() | |
conn.close() | |
if user: | |
return User(username=user[0]) | |
return None | |
def authenticate_user(username: str, password: str): | |
conn = sqlite3.connect(DB_NAME) | |
cur = conn.cursor() | |
cur.execute("SELECT * FROM users WHERE username=?", (username,)) | |
user = cur.fetchone() | |
conn.close() | |
if not user: | |
return False | |
if not verify_password(password, user[1]): | |
return False | |
return User(username=user[0]) | |
def create_access_token(data: dict): | |
to_encode = data.copy() | |
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
return encoded_jwt | |
async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
user = authenticate_user(form_data.username, form_data.password) | |
if not user: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Incorrect username or password", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
access_token = create_access_token(data={"sub": user.username}) | |
return {"access_token": access_token, "token_type": "bearer"} | |
async def get_current_user(token: str = Depends(oauth2_scheme)): | |
credentials_exception = HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
try: | |
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
username: str = payload.get("sub") | |
if username is None: | |
raise credentials_exception | |
user = get_user(username) | |
if user is None: | |
raise credentials_exception | |
return user | |
except JWTError: | |
raise credentials_exception |