pychatbot / auth /authentication.py
kltn20133118's picture
Upload 258 files
13ba451 verified
raw
history blame
5.16 kB
import time
from typing import Dict
import jwt
import secrets
import logging
from fastapi import Depends, HTTPException
import base64
from datetime import datetime, timedelta
from repository import UserRepository, UserLoginRepository
import string, random
def check_token_is_valid(token):
check = UserRepository.getEmailUserByAccessToken(token)
if check is None:
return False
return True
def unique_string(byte: int = 8) -> str:
return secrets.token_urlsafe(byte)
JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
JWT_ALGORITHM = "HS512"
SECRET_KEY= "8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"
def token_response(token: str):
return {
"access_token": token
}
def str_encode(string: str) -> str:
return base64.b85encode(string.encode('ascii')).decode('ascii')
def get_token_payload(token: str, secret: str, algo: str):
try:
payload = jwt.decode(token, secret, algorithms=algo)
except Exception as jwt_exec:
logging.debug(f"JWT Error: {str(jwt_exec)}")
payload = None
return payload
from datetime import datetime
def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
expire = datetime.now() + expiry
payload.update({"exp": expire})
return jwt.encode(payload, secret, algorithm=algo)
def str_decode(string: str) -> str:
return base64.b85decode(string.encode('ascii')).decode('ascii')
def generate_random_string(length=12):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for i in range(length))
return random_string
import pytz
from datetime import datetime
def signJWT(user_email: str) -> Dict[str, str]:
rt_expires = timedelta(days=3)
refresh_key = unique_string(100)
access_key = unique_string(50)
at_expires = timedelta(minutes=180)
at_payload = {
"sub": str_encode(str(user_email)),
'a': access_key,
}
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
expires_in = at_expires.seconds
vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
existing_user = UserRepository.getUserByEmail(user_email)
if existing_user is None:
UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
else:
UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
user_record = UserRepository.getUserByEmail(user_email)
session_id = ""
if user_record:
session_id = generate_random_string()
existing_userlogin = UserLoginRepository.getUserLogin(user_email)
if existing_userlogin is None:
UserLoginRepository.addUserLogin(user_email,session_id=session_id)
else:
UserLoginRepository.updateUserLogin(user_email, session_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": at_expires.seconds,
"session_id": session_id
}
def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
access_key = unique_string(50)
at_expires = timedelta(minutes=180)
at_payload = {
"sub": str_encode(str(user_email)),
'a': access_key,
}
access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
user_record = UserRepository.getUserByEmail(user_email)
session_id = ""
if user_record:
email1 = user_record.email
if email1:
session_id = generate_random_string()
existing_userlogin = UserLoginRepository.getUserLogin(user_email)
if existing_userlogin is None:
UserLoginRepository.addUserLogin(user_email,session_id=session_id)
else:
UserLoginRepository.updateUserLogin(user_email,session_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": at_expires.seconds,
"session_id": session_id
}
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["exp"] >= time.time() else None
except:
return {}
def get_refresh_token(refresh_token, email):
token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
if not token_payload:
raise HTTPException(status_code=403, detail="Invalid Request.")
exp = token_payload.get('exp')
if exp >= time.time() and token_payload:
return returnAccessToken(email,refresh_token)
elif not token_payload:
return signJWT(email)