# This file is responsible for signing , encoding , decoding and returning JWTS import time from typing import Dict import jwt from decouple import config import secrets import logging from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer import jwt from passlib.context import CryptContext import base64 from sqlalchemy.orm import joinedload, Session from datetime import datetime, timedelta def unique_string(byte: int = 8) -> str: return secrets.token_urlsafe(byte) JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" JWT_ALGORITHM = "HS256" SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1" def token_response(token: str): return { "access_token": token } def str_encode(string: str) -> str: return base64.b85encode(string.encode('ascii')).decode('ascii') import base64 from datetime import datetime, timedelta def get_token_payload(token: str, secret: str, algo: str): try: payload = jwt.decode(token, secret, algorithms=algo) except Exception as jwt_exec: logging.debug(f"JWT Error: {str(jwt_exec)}") payload = None return payload def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta): expire = datetime.utcnow() + expiry payload.update({"exp": expire}) return jwt.encode(payload, secret, algorithm=algo) def str_decode(string: str) -> str: return base64.b85decode(string.encode('ascii')).decode('ascii') # function used for signing the JWT string def signJWT(user_email: str) -> Dict[str, str]: rt_expires = timedelta(days=30) refresh_key = unique_string(100) access_key = unique_string(50) payload = { "user_email": user_email, } at_expires = timedelta(minutes=180) #access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) at_payload = { "sub": str_encode(str(user_email)), 'a': access_key, } access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key} refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires) return { "access_token": access_token, "refresh_token": refresh_token, "expires_in": at_expires.seconds } def returnAccessToken(user_email: str,refresh_token: str) -> Dict[str, str]: rt_expires = timedelta(days=30) refresh_key = unique_string(100) access_key = unique_string(50) payload = { "user_email": user_email, } at_expires = timedelta(minutes=180) #access_token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) at_payload = { "sub": str_encode(str(user_email)), 'a': access_key, } access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) return { "access_token": access_token, "refresh_token" : refresh_token, "expires_in": at_expires.seconds } def decodeJWT(token: str) -> dict: try: decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return decoded_token if decoded_token["exp"] >= time.time() else None except: return {} def get_refresh_token(refresh_token,token_now, email): token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM) if not token_payload: raise HTTPException(status_code=400, detail="Invalid Request.") refresh_key = token_payload.get('t') access_key = token_payload.get('a') exp = token_payload.get('exp') #decoded_token = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) if exp >= time.time(): return returnAccessToken(email,refresh_token) else: return signJWT(email)