Spaces:
Sleeping
Sleeping
File size: 5,494 Bytes
13ba451 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
from typing import Union
from datetime import timedelta
from request import RequestChat as req
from response import ResponseChat as res
from repository import ChatHistoryRepository, DetailChatRepository, UserRepository
import function.chatbot as sf
from typing import Dict
import json, re
from pydantic import BaseModel
from function import support_function
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
def check_email(email):
if(re.fullmatch(regex, email)):
return True
else:
return False
class Document(BaseModel):
page_content: str
metadata: Dict[str, str]
type: str
async def query2_upgrade_old(request: req.RequestQuery2UpgradeOld):
# try:
user_id = request.user_id
question = request.question
text_all = request.text_all
chat_name = request.chat_name
email = support_function.check_email_service(str(user_id))
if isinstance(email, res.ReponseError):
return email
if question is None:
return res.ReponseError(
status=400,
data =res.Message(message="question is empty")
)
if chat_name is None:
return res.ReponseError(
status=400,
data =res.Message(message="chat_name is empty")
)
text_all_dicts = json.loads(text_all)
text_all1 = [Document(**doc) for doc in text_all_dicts]
chat_id_exist = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id, chat_name)
chat_history = ""
if chat_id_exist:
list_chat_detail = DetailChatRepository.getListDetailChatByChatId(chat_id_exist)
for item in list_chat_detail:
chat_history += f"You: {item.YouMessage}\nAI: {item.AiMessage}\n\n"
test, list1, list2 = sf.handle_query_upgrade_keyword_old(question, text_all1, email,chat_history)
text1 = "<Data_Relevant>".join(list1)
text2 = "<Source_File>".join(list2)
id = 0
if test:
chat_id = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name)
if chat_id:
id = DetailChatRepository.addDetailChat(chat_id,question, test, text1, text2)
if chat_id is None:
ChatHistoryRepository.addChatHistory(user_id,chat_name)
chat_id_new = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name)
id = DetailChatRepository.addDetailChat(chat_id_new, question, test, text1, text2)
return res.ResponseQuery2UpgradeOld(
status= 200,
data = res.DataAnswer1(id = id,
answer=test,
data_relevant=list1,
sources=list2))
if test is None or test == "":
return res.ReponseError(
status=500,
data =res.Message(message="No answer")
)
# except:
# return res.ReponseError(
# status=500,
# data =res.Message(message="Server Error")
# )
async def extract_file(request: req.RequestExtractFile):
try:
user_id = request.user_id
email = support_function.check_email_service(str(user_id))
if isinstance(email, res.ReponseError):
return email
text_all1 = sf.extract_data2(email)
if text_all1 is False:
return res.ResponseExtractFile(
status= 200,
data = res.DataExtractFile(text_all="No data response"))
return res.ResponseExtractFile(
status= 200,
data = res.DataExtractFile(text_all=text_all1))
except:
return res.ReponseError(
status=500,
data =res.Message(message="Server Error")
)
async def generate_question(request: req.RequestGenerateQuestion):
try:
user_id = request.user_id
email = support_function.check_email_service(str(user_id))
if isinstance(email, res.ReponseError):
return email
text_all1 = sf.generate_question(email)
if text_all1 is False:
return res.ResponseGenerateQuestion(
status= 200,
data = res.GenerateQuestion(question=False))
return res.ResponseGenerateQuestion(
status= 200,
data = res.GenerateQuestion(question=text_all1))
except:
return res.ReponseError(
status=500,
data=res.Message(message="Server Error")
)
async def delete_chat(request: req.RequestDeleteChat):
try:
user_id = request.user_id
email = support_function.check_email_service(str(user_id))
if isinstance(email, res.ReponseError):
return email
chat_name = request.chat_name
if chat_name is None:
return res.ReponseError(
status=400,
data =res.Message(message="chat_name is empty")
)
DetailChatRepository.delete_chat_detail(chat_name)
check = ChatHistoryRepository.deleteChatHistory(user_id,chat_name)
if check is False:
return res.ResponseDeleteChat(
status = 500,
data = res.Message(message="Delete conversation chat failed"))
else:
return res.ResponseDeleteChat(
status = 200,
data = res.Message(message="Delete conversation chat success"))
except:
return res.ReponseError(
status= 500,
data = res.Message(message="Server Error")
) |