chgpt / modules /security.py
Miguel Diaz
Update assistant + google oauth + google search
9c5c050
import jwt
from hashlib import sha256
from fastapi import HTTPException, status, Request, Response
from . import log_module, settings
from datetime import datetime, timedelta
from Crypto.Signature import pss
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import base64
for key in settings.USERS:
if key == "master": continue
password = key+settings.USERS[key]+settings.USERS["master"]
settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest()
def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API):
expire = datetime.utcnow() + timedelta(minutes=maxlife)
to_encode = data | {"exp": expire}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def validate_jwt_token(token: str, usage: str = "api"):
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return payload
except Exception as e:
log_module.logger().error(repr(e) + " - Invalid token: " + str(token))
if usage == "view":
return raise_307("Failed validating jwt token")
return raise_401("Failed validating jwt token")
def token_from_cookie(request:Request):
if token := request.cookies.get('token', ""):
return validate_jwt_token(token, "view")
return raise_307("Token not found")
def token_from_headers(request:Request):
if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer":
return validate_jwt_token(bearer[1])
return raise_401("Bearer malformed")
def can_use(role, activity):
can = {
"chat": ["user", "admin"]
}
return role in can[activity]
def raise_401(detail:str):
headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
"Location": "/login"}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers=headers
)
def raise_307(detail:str):
headers = {
"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
#"Location": "/login"
}
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
detail=detail,
headers=headers
)
def validate_signature(public_key: str, signature: str, data:str) -> bool:
public_key = RSA.import_key(public_key)
signature = base64.b64decode(signature)
data_ = SHA256.new(data.encode())
try:
pss.new(public_key).verify(data_, signature)
return True
except ValueError:
raise_401("Signature failed")
def sha256(data:str|bytes) -> str:
data_ = data if isinstance(data, bytes) else data.encode()
return SHA256.new(data_).hexdigest()
def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}):
expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time)
response.set_cookie(
key=key,
value=value,
# httponly=True,
# samesite='none',
expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"),
#domain='.<YOUR DOMAIN>'
)