pychatbot / function /support_function.py
kltn20133118's picture
Upload 258 files
13ba451 verified
from pydantic.error_wrappers import ErrorWrapper
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from service import MySQLService,UserService,ChatService
from request import RequestMySQL,RequestUser,RequestDefault
from auth.authentication import decodeJWT
from repository import UserRepository
from auth import authentication
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, Form, File, UploadFile
from typing import List
from service import FileService,DefaultService,UserService
from request import RequestFile,RequestChat,RequestDefault
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic.error_wrappers import ErrorWrapper
import json
from function import support_function
from repository import UserRepository
from response import ResponseDefault as res
import re
def is_positive_integer(value):
if isinstance(value, int) and value > 0:
return True
else:
return False
def check_value_user_id_controller(user_id: str):
if user_id is None or user_id.strip() == "":
return res.ReponseError(status=400,
data=res.Message(message="user_id field is required."))
user_id = user_id.strip("'").strip('"')
try:
user_id_int = int(user_id)
except ValueError:
return res.ReponseError(status=400,
data=res.Message(message="user_id must be an integer"))
if not support_function.is_positive_integer(user_id_int):
return res.ReponseError(status=400,
data=res.Message(message="user_id must be greater than 0"))
return True
def check_value_user_id(user_id: str, current_user_email: str):
if user_id is None or user_id.strip() == "":
return res.ReponseError(status=400,
data=res.Message(message="user_id field is required."))
user_id = user_id.strip("'").strip('"')
try:
user_id_int = int(user_id)
except ValueError:
return res.ReponseError(status=400,
data=res.Message(message="user_id must be an integer"))
if not support_function.is_positive_integer(user_id_int):
return res.ReponseError(status=400,
data=res.Message(message="user_id must be greater than 0"))
email = UserRepository.getEmailUserByIdFix(user_id)
if email is None:
return res.ReponseError(status=404,
data=res.Message(message="user_id not exist"))
email = email[0]
if email != current_user_email:
raise HTTPException(status_code=403, detail="Sorry, you can't perform actions with this user id.")
return True
def check_value_email_controller(email: str):
if email is None or email.strip() == "":
return res.ReponseError(status = 400,
data = res.Message(message="Email is required."))
try:
int(email)
return res.ReponseError(status=400,
data=res.Message(message="Email must be a string, not a number."))
except ValueError:
pass
return True
def check_value_otp(otp: str):
if otp is None:
return res.ReponseError(status=400,
data=res.Message(message="OTP is required"))
if otp.isdigit():
return res.ReponseError(status=400,
data=res.Message(message="OTP must be a string, not a number."))
if len(otp) != 6:
return res.ReponseError(status=400,
data=res.Message(message="OTP max length is 6"))
return True
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
def check_email(email):
if(re.fullmatch(regex, email)):
return True
else:
return False
def check_email_service(user_id: str):
email1 = UserRepository.getEmailUserByIdFix(user_id)
if email1 is None:
return res.ReponseError(
status=404,
data=res.Message(message="Id not exist")
)
email = email1[0]
if email is None:
return res.ReponseError(
status=400,
data=res.Message(message="Email is empty")
)
if check_email(email) == False:
return res.ReponseError(
status=400,
data=res.Message(message="Email invalid")
)
return email
def check_email_empty_invalid(email: str):
if email is None or email == "":
return res.ReponseError(
status=400,
data=res.Message(message="Email is empty")
)
if check_email(email) == False:
return res.ReponseError(
status=400,
data =res.Message(message="Email invalid")
)
return True