Spaces:
Running
Running
import jwt | |
from hashlib import sha256 | |
from fastapi import HTTPException, status, Request, Response | |
from . import log_module, settings | |
from datetime import datetime, timedelta | |
from Crypto.Signature import pss | |
from Crypto.Hash import SHA256 | |
from Crypto.PublicKey import RSA | |
import base64 | |
for key in settings.USERS: | |
if key == "master": continue | |
password = key+settings.USERS[key]+settings.USERS["master"] | |
settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest() | |
def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API): | |
expire = datetime.utcnow() + timedelta(minutes=maxlife) | |
to_encode = data | {"exp": expire} | |
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) | |
return encoded_jwt | |
def validate_jwt_token(token: str, usage: str = "api"): | |
try: | |
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) | |
return payload | |
except Exception as e: | |
log_module.logger().error(repr(e) + " - Invalid token: " + str(token)) | |
if usage == "view": | |
return raise_307("Failed validating jwt token") | |
return raise_401("Failed validating jwt token") | |
def token_from_cookie(request:Request): | |
if token := request.cookies.get('token', ""): | |
return validate_jwt_token(token, "view") | |
return raise_307("Token not found") | |
def token_from_headers(request:Request): | |
if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer": | |
return validate_jwt_token(bearer[1]) | |
return raise_401("Bearer malformed") | |
def can_use(role, activity): | |
can = { | |
"chat": ["user", "admin"] | |
} | |
return role in can[activity] | |
def raise_401(detail:str): | |
headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", | |
"Location": "/login"} | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=detail, | |
headers=headers | |
) | |
def raise_307(detail:str): | |
headers = { | |
"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", | |
#"Location": "/login" | |
} | |
raise HTTPException( | |
status_code=status.HTTP_307_TEMPORARY_REDIRECT, | |
detail=detail, | |
headers=headers | |
) | |
def validate_signature(public_key: str, signature: str, data:str) -> bool: | |
public_key = RSA.import_key(public_key) | |
signature = base64.b64decode(signature) | |
data_ = SHA256.new(data.encode()) | |
try: | |
pss.new(public_key).verify(data_, signature) | |
return True | |
except ValueError: | |
raise_401("Signature failed") | |
def sha256(data:str|bytes) -> str: | |
data_ = data if isinstance(data, bytes) else data.encode() | |
return SHA256.new(data_).hexdigest() | |
def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}): | |
expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time) | |
response.set_cookie( | |
key=key, | |
value=value, | |
# httponly=True, | |
# samesite='none', | |
expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), | |
#domain='.<YOUR DOMAIN>' | |
) |