Spaces:
Sleeping
Sleeping
from typing import Union | |
from datetime import timedelta | |
from request import RequestChat as req | |
from response import ResponseChat as res | |
from repository import ChatHistoryRepository, DetailChatRepository, UserRepository | |
import function.chatbot as sf | |
from typing import Dict | |
import json, re | |
from pydantic import BaseModel | |
from function import support_function | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def check_email(email): | |
if(re.fullmatch(regex, email)): | |
return True | |
else: | |
return False | |
class Document(BaseModel): | |
page_content: str | |
metadata: Dict[str, str] | |
type: str | |
async def query2_upgrade_old(request: req.RequestQuery2UpgradeOld): | |
# try: | |
user_id = request.user_id | |
question = request.question | |
text_all = request.text_all | |
chat_name = request.chat_name | |
email = support_function.check_email_service(str(user_id)) | |
if isinstance(email, res.ReponseError): | |
return email | |
if question is None: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="question is empty") | |
) | |
if chat_name is None: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="chat_name is empty") | |
) | |
text_all_dicts = json.loads(text_all) | |
text_all1 = [Document(**doc) for doc in text_all_dicts] | |
chat_id_exist = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id, chat_name) | |
chat_history = "" | |
if chat_id_exist: | |
list_chat_detail = DetailChatRepository.getListDetailChatByChatId(chat_id_exist) | |
for item in list_chat_detail: | |
chat_history += f"You: {item.YouMessage}\nAI: {item.AiMessage}\n\n" | |
test, list1, list2 = sf.handle_query_upgrade_keyword_old(question, text_all1, email,chat_history) | |
text1 = "<Data_Relevant>".join(list1) | |
text2 = "<Source_File>".join(list2) | |
id = 0 | |
if test: | |
chat_id = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name) | |
if chat_id: | |
id = DetailChatRepository.addDetailChat(chat_id,question, test, text1, text2) | |
if chat_id is None: | |
ChatHistoryRepository.addChatHistory(user_id,chat_name) | |
chat_id_new = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name) | |
id = DetailChatRepository.addDetailChat(chat_id_new, question, test, text1, text2) | |
return res.ResponseQuery2UpgradeOld( | |
status= 200, | |
data = res.DataAnswer1(id = id, | |
answer=test, | |
data_relevant=list1, | |
sources=list2)) | |
if test is None or test == "": | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="No answer") | |
) | |
# except: | |
# return res.ReponseError( | |
# status=500, | |
# data =res.Message(message="Server Error") | |
# ) | |
async def extract_file(request: req.RequestExtractFile): | |
try: | |
user_id = request.user_id | |
email = support_function.check_email_service(str(user_id)) | |
if isinstance(email, res.ReponseError): | |
return email | |
text_all1 = sf.extract_data2(email) | |
if text_all1 is False: | |
return res.ResponseExtractFile( | |
status= 200, | |
data = res.DataExtractFile(text_all="No data response")) | |
return res.ResponseExtractFile( | |
status= 200, | |
data = res.DataExtractFile(text_all=text_all1)) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Server Error") | |
) | |
async def generate_question(request: req.RequestGenerateQuestion): | |
try: | |
user_id = request.user_id | |
email = support_function.check_email_service(str(user_id)) | |
if isinstance(email, res.ReponseError): | |
return email | |
text_all1 = sf.generate_question(email) | |
if text_all1 is False: | |
return res.ResponseGenerateQuestion( | |
status= 200, | |
data = res.GenerateQuestion(question=False)) | |
return res.ResponseGenerateQuestion( | |
status= 200, | |
data = res.GenerateQuestion(question=text_all1)) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
async def delete_chat(request: req.RequestDeleteChat): | |
try: | |
user_id = request.user_id | |
email = support_function.check_email_service(str(user_id)) | |
if isinstance(email, res.ReponseError): | |
return email | |
chat_name = request.chat_name | |
if chat_name is None: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="chat_name is empty") | |
) | |
DetailChatRepository.delete_chat_detail(chat_name) | |
check = ChatHistoryRepository.deleteChatHistory(user_id,chat_name) | |
if check is False: | |
return res.ResponseDeleteChat( | |
status = 500, | |
data = res.Message(message="Delete conversation chat failed")) | |
else: | |
return res.ResponseDeleteChat( | |
status = 200, | |
data = res.Message(message="Delete conversation chat success")) | |
except: | |
return res.ReponseError( | |
status= 500, | |
data = res.Message(message="Server Error") | |
) |