richlai's picture
add files
8b1e853
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import sqlite3
import os
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 300
pwd_context = CryptContext(schemes=["django_pbkdf2_sha256"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
auth_router = APIRouter()
class User(BaseModel):
username: str
class Token(BaseModel):
access_token: str
token_type: str
# SQLite setup
DB_NAME = "users.db"
def init_db():
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute('''CREATE TABLE IF NOT EXISTS users
(username TEXT PRIMARY KEY, hashed_password TEXT)''')
# Add sample users if they don't exist
cur.execute("INSERT OR IGNORE INTO users VALUES (?, ?)",
("admin", pwd_context.hash("admin")))
conn.commit()
conn.close()
# Initialize the database
init_db()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(username: str):
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE username=?", (username,))
user = cur.fetchone()
conn.close()
if user:
return User(username=user[0])
return None
def authenticate_user(username: str, password: str):
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE username=?", (username,))
user = cur.fetchone()
conn.close()
if not user:
return False
if not verify_password(password, user[1]):
return False
return User(username=user[0])
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@auth_router.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
user = get_user(username)
if user is None:
raise credentials_exception
return user
except JWTError:
raise credentials_exception