Spaces:
Running
Running
File size: 4,229 Bytes
fb22f47 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import logging
import uuid
import jwt
from datetime import UTC, datetime, timedelta
from typing import Optional, Union, List, Dict
from open_webui.apps.webui.models.users import Users
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import WEBUI_SECRET_KEY
from fastapi import Depends, HTTPException, Request, Response, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
logging.getLogger("passlib").setLevel(logging.ERROR)
SESSION_SECRET = WEBUI_SECRET_KEY
ALGORITHM = "HS256"
##############
# Auth Utils
##############
bearer_security = HTTPBearer(auto_error=False)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return (
pwd_context.verify(plain_password, hashed_password) if hashed_password else None
)
def get_password_hash(password):
return pwd_context.hash(password)
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
payload = data.copy()
if expires_delta:
expire = datetime.now(UTC) + expires_delta
payload.update({"exp": expire})
encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[dict]:
try:
decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
return decoded
except Exception:
return None
def extract_token_from_auth_header(auth_header: str):
return auth_header[len("Bearer ") :]
def create_api_key():
key = str(uuid.uuid4()).replace("-", "")
return f"sk-{key}"
def get_http_authorization_cred(auth_header: str):
try:
scheme, credentials = auth_header.split(" ")
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
except Exception:
raise ValueError(ERROR_MESSAGES.INVALID_TOKEN)
def get_current_user(
request: Request,
auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
):
token = None
if auth_token is not None:
token = auth_token.credentials
if token is None and "token" in request.cookies:
token = request.cookies.get("token")
if token is None:
raise HTTPException(status_code=403, detail="Not authenticated")
# auth by api key
if token.startswith("sk-"):
if not request.state.enable_api_key:
raise HTTPException(
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.API_KEY_NOT_ALLOWED
)
return get_current_user_by_api_key(token)
# auth by jwt token
try:
data = decode_token(token)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
if data is not None and "id" in data:
user = Users.get_user_by_id(data["id"])
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.INVALID_TOKEN,
)
else:
Users.update_user_last_active_by_id(user.id)
return user
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
def get_current_user_by_api_key(api_key: str):
user = Users.get_user_by_api_key(api_key)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.INVALID_TOKEN,
)
else:
Users.update_user_last_active_by_id(user.id)
return user
def get_verified_user(user=Depends(get_current_user)):
if user.role not in {"user", "admin"}:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
return user
def get_admin_user(user=Depends(get_current_user)):
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
return user
|