TextSummarization / models /security.py
asFrants's picture
remove templates
11fe449
import os
import secrets
from typing import Annotated
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi import status, Depends, Cookie
from datetime import datetime, timedelta
from collections import defaultdict
from loguru import logger
from dotenv import load_dotenv
from models.exceptions import MyHTTPException
load_dotenv()
API_USER = os.getenv("API_USER")
API_PWD = os.getenv("API_PWD")
API_KEY = os.getenv("API_KEY")
security = HTTPBasic()
class AuthUsers:
REQUESTS_LIMIT = 3 # 3 REQUESTS PER MINUTE
AUTH_TIME = 1 # 1 MINUTES
users: set
users_auth: defaultdict
def __init__(self):
self.users = set()
self.users_auth = defaultdict(list)
def generate_user_token(self) -> str:
return secrets.token_hex(16)
def verify_user(self, user_token: str) -> bool:
logger.info(f"Check user token: {user_token}")
if user_token in self.users:
print(self.users_auth[user_token])
if len(self.users_auth[user_token]) < self.REQUESTS_LIMIT:
return True
elif datetime.now() - self.users_auth[user_token][
-self.REQUESTS_LIMIT
] >= timedelta(minutes=self.AUTH_TIME):
return True
else:
raise MyHTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many requests. Try again later.",
)
logger.info(f"User {user_token} not found")
raise MyHTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized user",
)
def add_user_session(self, user_token: str):
if user_token in self.users:
self.users_auth[user_token].append(datetime.now())
else:
self.users.add(user_token)
self.users_auth.setdefault(user_token, []).append(datetime.now())
print(self.users_auth[user_token])
while len(self.users_auth[user_token]) > self.REQUESTS_LIMIT:
self.users_auth[user_token].pop(0)
logger.info(f"User's {user_token} session added")
def get_cookie_data(
self, user_token: str = Cookie(default=None, alias="Authorization")
) -> str:
if not user_token:
user_token = self.generate_user_token()
if user_token not in self.users:
self.users.add(user_token)
logger.info("Unauthorized user")
logger.info(f"Verified user with token: {user_token}")
return user_token
credentials_exception = MyHTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Login or password is incorrect",
headers={"WWW-Authenticate": "Basic"},
)
def check_api_credentials(
credentials: Annotated[HTTPBasicCredentials, Depends(security)]
):
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = bytes(API_USER, "utf-8")
is_correct_username = secrets.compare_digest(
current_username_bytes, correct_username_bytes
)
current_password_bytes = credentials.password.encode("utf8")
correct_password_bytes = bytes(API_PWD, "utf-8")
is_correct_password = secrets.compare_digest(
current_password_bytes, correct_password_bytes
)
if not (is_correct_username and is_correct_password):
raise credentials_exception
return credentials.username