Spaces:
Sleeping
Sleeping
# This file is responsible for signing , encoding , decoding and returning JWTS | |
import time | |
from typing import Dict | |
import jwt | |
from decouple import config | |
import secrets | |
import logging | |
from fastapi import Depends, HTTPException | |
from fastapi.security import OAuth2PasswordBearer | |
import jwt | |
from passlib.context import CryptContext | |
import base64 | |
from sqlalchemy.orm import joinedload, Session | |
from datetime import datetime, timedelta | |
def unique_string(byte: int = 8) -> str: | |
return secrets.token_urlsafe(byte) | |
JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" | |
JWT_ALGORITHM = "HS256" | |
SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1" | |
def token_response(token: str): | |
return { | |
"access_token": token | |
} | |
def str_encode(string: str) -> str: | |
return base64.b85encode(string.encode('ascii')).decode('ascii') | |
import base64 | |
from datetime import datetime, timedelta | |
def get_token_payload(token: str, secret: str, algo: str): | |
try: | |
payload = jwt.decode(token, secret, algorithms=algo) | |
except Exception as jwt_exec: | |
logging.debug(f"JWT Error: {str(jwt_exec)}") | |
payload = None | |
return payload | |
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta): | |
expire = datetime.utcnow() + expiry | |
payload.update({"exp": expire}) | |
return jwt.encode(payload, secret, algorithm=algo) | |
def str_decode(string: str) -> str: | |
return base64.b85decode(string.encode('ascii')).decode('ascii') | |
# function used for signing the JWT string | |
def signJWT(user_email: str) -> Dict[str, str]: | |
rt_expires = timedelta(days=30) | |
refresh_key = unique_string(100) | |
access_key = unique_string(50) | |
payload = { | |
"user_email": user_email, | |
} | |
at_expires = timedelta(minutes=180) | |
#access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) | |
at_payload = { | |
"sub": str_encode(str(user_email)), | |
'a': access_key, | |
} | |
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key} | |
refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires) | |
return { | |
"access_token": access_token, | |
"refresh_token": refresh_token, | |
"expires_in": at_expires.seconds | |
} | |
def returnAccessToken(user_email: str,refresh_token: str) -> Dict[str, str]: | |
rt_expires = timedelta(days=30) | |
refresh_key = unique_string(100) | |
access_key = unique_string(50) | |
payload = { | |
"user_email": user_email, | |
} | |
at_expires = timedelta(minutes=180) | |
#access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) | |
at_payload = { | |
"sub": str_encode(str(user_email)), | |
'a': access_key, | |
} | |
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
return { | |
"access_token": access_token, | |
"refresh_token" : refresh_token, | |
"expires_in": at_expires.seconds | |
} | |
def decodeJWT(token: str) -> dict: | |
try: | |
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) | |
return decoded_token if decoded_token["exp"] >= time.time() else None | |
except: | |
return {} | |
def get_refresh_token(refresh_token,token_now, email): | |
token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM) | |
if not token_payload: | |
raise HTTPException(status_code=400, detail="Invalid Request.") | |
refresh_key = token_payload.get('t') | |
access_key = token_payload.get('a') | |
exp = token_payload.get('exp') | |
#decoded_token = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) | |
if exp >= time.time(): | |
return returnAccessToken(email,refresh_token) | |
else: | |
return signJWT(email) |