Spaces:
Sleeping
Sleeping
from pydantic.error_wrappers import ErrorWrapper | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from service import MySQLService,UserService,ChatService | |
from request import RequestMySQL,RequestUser,RequestDefault | |
from auth.authentication import decodeJWT | |
from repository import UserRepository | |
from auth import authentication | |
from datetime import datetime, timedelta | |
from fastapi import Depends, HTTPException, Form, File, UploadFile | |
from typing import List | |
from service import FileService,DefaultService,UserService | |
from request import RequestFile,RequestChat,RequestDefault | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse | |
from pydantic.error_wrappers import ErrorWrapper | |
import json | |
from function import support_function | |
from repository import UserRepository | |
from response import ResponseDefault as res | |
import re | |
def is_positive_integer(value): | |
if isinstance(value, int) and value > 0: | |
return True | |
else: | |
return False | |
def check_value_user_id_controller(user_id: str): | |
if user_id is None or user_id.strip() == "": | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id field is required.")) | |
user_id = user_id.strip("'").strip('"') | |
try: | |
user_id_int = int(user_id) | |
except ValueError: | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id must be an integer")) | |
if not support_function.is_positive_integer(user_id_int): | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id must be greater than 0")) | |
return True | |
def check_value_user_id(user_id: str, current_user_email: str): | |
if user_id is None or user_id.strip() == "": | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id field is required.")) | |
user_id = user_id.strip("'").strip('"') | |
try: | |
user_id_int = int(user_id) | |
except ValueError: | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id must be an integer")) | |
if not support_function.is_positive_integer(user_id_int): | |
return res.ReponseError(status=400, | |
data=res.Message(message="user_id must be greater than 0")) | |
email = UserRepository.getEmailUserByIdFix(user_id) | |
if email is None: | |
return res.ReponseError(status=404, | |
data=res.Message(message="user_id not exist")) | |
email = email[0] | |
if email != current_user_email: | |
raise HTTPException(status_code=403, detail="Sorry, you can't perform actions with this user id.") | |
return True | |
def check_value_email_controller(email: str): | |
if email is None or email.strip() == "": | |
return res.ReponseError(status = 400, | |
data = res.Message(message="Email is required.")) | |
try: | |
int(email) | |
return res.ReponseError(status=400, | |
data=res.Message(message="Email must be a string, not a number.")) | |
except ValueError: | |
pass | |
return True | |
def check_value_otp(otp: str): | |
if otp is None: | |
return res.ReponseError(status=400, | |
data=res.Message(message="OTP is required")) | |
if otp.isdigit(): | |
return res.ReponseError(status=400, | |
data=res.Message(message="OTP must be a string, not a number.")) | |
if len(otp) != 6: | |
return res.ReponseError(status=400, | |
data=res.Message(message="OTP max length is 6")) | |
return True | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def check_email(email): | |
if(re.fullmatch(regex, email)): | |
return True | |
else: | |
return False | |
def check_email_service(user_id: str): | |
email1 = UserRepository.getEmailUserByIdFix(user_id) | |
if email1 is None: | |
return res.ReponseError( | |
status=404, | |
data=res.Message(message="Id not exist") | |
) | |
email = email1[0] | |
if email is None: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Email is empty") | |
) | |
if check_email(email) == False: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Email invalid") | |
) | |
return email | |
def check_email_empty_invalid(email: str): | |
if email is None or email == "": | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Email is empty") | |
) | |
if check_email(email) == False: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="Email invalid") | |
) | |
return True |