KLTN / auth_hander.py
ohhhchank3's picture
Update auth_hander.py
5f0a03f verified
# This file is responsible for signing , encoding , decoding and returning JWTS
import time
from typing import Dict
import jwt
from decouple import config
import secrets
import logging
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
import jwt
from passlib.context import CryptContext
import base64
from sqlalchemy.orm import joinedload, Session
from datetime import datetime, timedelta
def unique_string(byte: int = 8) -> str:
return secrets.token_urlsafe(byte)
JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
JWT_ALGORITHM = "HS256"
SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"
def token_response(token: str):
return {
"access_token": token
}
def str_encode(string: str) -> str:
return base64.b85encode(string.encode('ascii')).decode('ascii')
import base64
from datetime import datetime, timedelta
def get_token_payload(token: str, secret: str, algo: str):
try:
payload = jwt.decode(token, secret, algorithms=algo)
except Exception as jwt_exec:
logging.debug(f"JWT Error: {str(jwt_exec)}")
payload = None
return payload
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
expire = datetime.utcnow() + expiry
payload.update({"exp": expire})
return jwt.encode(payload, secret, algorithm=algo)
def str_decode(string: str) -> str:
return base64.b85decode(string.encode('ascii')).decode('ascii')
# function used for signing the JWT string
def signJWT(user_email: str) -> Dict[str, str]:
rt_expires = timedelta(days=30)
refresh_key = unique_string(100)
access_key = unique_string(50)
payload = {
"user_email": user_email,
}
at_expires = timedelta(minutes=180)
#access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
at_payload = {
"sub": str_encode(str(user_email)),
'a': access_key,
}
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": at_expires.seconds
}
def returnAccessToken(user_email: str,refresh_token: str) -> Dict[str, str]:
rt_expires = timedelta(days=30)
refresh_key = unique_string(100)
access_key = unique_string(50)
payload = {
"user_email": user_email,
}
at_expires = timedelta(minutes=180)
#access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
at_payload = {
"sub": str_encode(str(user_email)),
'a': access_key,
}
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
return {
"access_token": access_token,
"refresh_token" : refresh_token,
"expires_in": at_expires.seconds
}
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["exp"] >= time.time() else None
except:
return {}
def get_refresh_token(refresh_token,token_now, email):
token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
if not token_payload:
raise HTTPException(status_code=400, detail="Invalid Request.")
refresh_key = token_payload.get('t')
access_key = token_payload.get('a')
exp = token_payload.get('exp')
#decoded_token = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
if exp >= time.time():
return returnAccessToken(email,refresh_token)
else:
return signJWT(email)