import jwt from hashlib import sha256 from fastapi import HTTPException, status, Request, Response from . import log_module, settings from datetime import datetime, timedelta from Crypto.Signature import pss from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA import base64 for key in settings.USERS: if key == "master": continue password = key+settings.USERS[key]+settings.USERS["master"] settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest() def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API): expire = datetime.utcnow() + timedelta(minutes=maxlife) to_encode = data | {"exp": expire} encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) return encoded_jwt def validate_jwt_token(token: str, usage: str = "api"): try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) return payload except Exception as e: log_module.logger().error(repr(e) + " - Invalid token: " + str(token)) if usage == "view": return raise_307("Failed validating jwt token") return raise_401("Failed validating jwt token") def token_from_cookie(request:Request): if token := request.cookies.get('token', ""): return validate_jwt_token(token, "view") return raise_307("Token not found") def token_from_headers(request:Request): if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer": return validate_jwt_token(bearer[1]) return raise_401("Bearer malformed") def can_use(role, activity): can = { "chat": ["user", "admin"] } return role in can[activity] def raise_401(detail:str): headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", "Location": "/login"} raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers=headers ) def raise_307(detail:str): headers = { "set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", #"Location": "/login" } raise HTTPException( status_code=status.HTTP_307_TEMPORARY_REDIRECT, detail=detail, headers=headers ) def validate_signature(public_key: str, signature: str, data:str) -> bool: public_key = RSA.import_key(public_key) signature = base64.b64decode(signature) data_ = SHA256.new(data.encode()) try: pss.new(public_key).verify(data_, signature) return True except ValueError: raise_401("Signature failed") def sha256(data:str|bytes) -> str: data_ = data if isinstance(data, bytes) else data.encode() return SHA256.new(data_).hexdigest() def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}): expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time) response.set_cookie( key=key, value=value, # httponly=True, # samesite='none', expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), #domain='.' )