Spaces:
Sleeping
Sleeping
import time | |
from typing import Dict | |
import jwt | |
import secrets | |
import logging | |
from fastapi import Depends, HTTPException | |
import base64 | |
from datetime import datetime, timedelta | |
from repository import UserRepository, UserLoginRepository | |
import string, random | |
def check_token_is_valid(token): | |
check = UserRepository.getEmailUserByAccessToken(token) | |
if check is None: | |
return False | |
return True | |
def unique_string(byte: int = 8) -> str: | |
return secrets.token_urlsafe(byte) | |
JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" | |
JWT_ALGORITHM = "HS512" | |
SECRET_KEY= "8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1" | |
def token_response(token: str): | |
return { | |
"access_token": token | |
} | |
def str_encode(string: str) -> str: | |
return base64.b85encode(string.encode('ascii')).decode('ascii') | |
def get_token_payload(token: str, secret: str, algo: str): | |
try: | |
payload = jwt.decode(token, secret, algorithms=algo) | |
except Exception as jwt_exec: | |
logging.debug(f"JWT Error: {str(jwt_exec)}") | |
payload = None | |
return payload | |
from datetime import datetime | |
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta): | |
expire = datetime.now() + expiry | |
payload.update({"exp": expire}) | |
return jwt.encode(payload, secret, algorithm=algo) | |
def str_decode(string: str) -> str: | |
return base64.b85decode(string.encode('ascii')).decode('ascii') | |
def generate_random_string(length=12): | |
characters = string.ascii_letters + string.digits | |
random_string = ''.join(random.choice(characters) for i in range(length)) | |
return random_string | |
import pytz | |
from datetime import datetime | |
def signJWT(user_email: str) -> Dict[str, str]: | |
rt_expires = timedelta(days=3) | |
refresh_key = unique_string(100) | |
access_key = unique_string(50) | |
at_expires = timedelta(minutes=180) | |
at_payload = { | |
"sub": str_encode(str(user_email)), | |
'a': access_key, | |
} | |
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key} | |
refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires) | |
expires_in = at_expires.seconds | |
vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh') | |
current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in) | |
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ') | |
existing_user = UserRepository.getUserByEmail(user_email) | |
if existing_user is None: | |
UserRepository.addUser(user_email, access_token, refresh_token, formatted_time) | |
else: | |
UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time) | |
user_record = UserRepository.getUserByEmail(user_email) | |
session_id = "" | |
if user_record: | |
session_id = generate_random_string() | |
existing_userlogin = UserLoginRepository.getUserLogin(user_email) | |
if existing_userlogin is None: | |
UserLoginRepository.addUserLogin(user_email,session_id=session_id) | |
else: | |
UserLoginRepository.updateUserLogin(user_email, session_id) | |
return { | |
"access_token": access_token, | |
"refresh_token": refresh_token, | |
"expires_in": at_expires.seconds, | |
"session_id": session_id | |
} | |
def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]: | |
access_key = unique_string(50) | |
at_expires = timedelta(minutes=180) | |
at_payload = { | |
"sub": str_encode(str(user_email)), | |
'a': access_key, | |
} | |
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires) | |
user_record = UserRepository.getUserByEmail(user_email) | |
session_id = "" | |
if user_record: | |
email1 = user_record.email | |
if email1: | |
session_id = generate_random_string() | |
existing_userlogin = UserLoginRepository.getUserLogin(user_email) | |
if existing_userlogin is None: | |
UserLoginRepository.addUserLogin(user_email,session_id=session_id) | |
else: | |
UserLoginRepository.updateUserLogin(user_email,session_id) | |
return { | |
"access_token": access_token, | |
"refresh_token": refresh_token, | |
"expires_in": at_expires.seconds, | |
"session_id": session_id | |
} | |
def decodeJWT(token: str) -> dict: | |
try: | |
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) | |
return decoded_token if decoded_token["exp"] >= time.time() else None | |
except: | |
return {} | |
def get_refresh_token(refresh_token, email): | |
token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM) | |
if not token_payload: | |
raise HTTPException(status_code=403, detail="Invalid Request.") | |
exp = token_payload.get('exp') | |
if exp >= time.time() and token_payload: | |
return returnAccessToken(email,refresh_token) | |
elif not token_payload: | |
return signJWT(email) |