|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from entity import InputHotTopic, ResponseQueue
|
|
from threading import Thread
|
|
from queue import Queue
|
|
import time
|
|
import json, requests
|
|
from service_cache_Thien import get_data_sorl
|
|
import os
|
|
from datetime import datetime
|
|
from email_validator import push_msg_tele
|
|
|
|
import time
|
|
import json
|
|
import hashlib
|
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
class InputHotTopic(BaseModel):
|
|
start_time: str = "2024-09-03 23:00:00"
|
|
end_time: str = "2024-09-05 23:00:00"
|
|
query: str = "Giá nhà chung cư trên Hà Nội"
|
|
keywords: list = ["chung cư, Hà Nội", "Hoà Lạc"]
|
|
top_cluster: int = 5
|
|
prompt: str = """Trong 300 từ, hãy tổng hợp thành một đoạn văn một cách đầy đủ, chi tiết, và trung thực về các chủ đề xung quanh biến động giá nhà chung cư Hà Nội từ nội dung dưới đây.
|
|
Nếu không có thông tin gì liên quan đến giá nhà chung cư Hà Nội trong nội dung cung cấp thì trả lời "không có thông tin". Không đưa quan điểm cá nhân, không lặp lại một phần câu hỏi, loại bỏ phần mở đầu. Không có những câu từ liên kết như: "Sau đây là nội dung tóm tắt", "Nội dung tóm tắt là", "Dưới đây là " ... """
|
|
check_relevent: str = "Hãy đánh giá nội dung dưới đây có thông tin liên quan đến giá cả nhà chung cư Hà Nội hay không? Chỉ trả lời có hoặc không, không đưa thêm thông tin không liên quan"
|
|
max_posts: int = 5000
|
|
|
|
def get_hash_id(item: InputHotTopic):
|
|
str_hash = ""
|
|
if item.id_topic:
|
|
str_hash += item.id_topic
|
|
str_hash += item.start_time
|
|
return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
|
|
else:
|
|
return ""
|
|
|
|
class SessionProcess(object):
|
|
|
|
def __init__(self):
|
|
self.session = dict()
|
|
|
|
def hash_session(self, query: InputHotTopic):
|
|
hash_dict = query.dict()
|
|
hash_dict['time'] = int(time.time())
|
|
return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest()
|
|
|
|
def insert_session(self, data_input):
|
|
print('data_input: ', data_input)
|
|
|
|
|
|
|
|
hash_id = self.hash_session(data_input)
|
|
if hash_id not in self.session:
|
|
self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(),
|
|
"result": {}, "data": data_input}
|
|
return hash_id
|
|
|
|
def get_info_session(self, hash_id: str):
|
|
if hash_id in self.session:
|
|
return self.session[hash_id]
|
|
return {"status": -2, "result": {}, "meta": {}}
|
|
|
|
def update_session(self, hash_id: str, result: dict, status: int):
|
|
if hash_id in self.session:
|
|
self.session[hash_id]["status"] = status
|
|
self.session[hash_id]["result"] = result
|
|
self.session[hash_id]["update_time"] = time.time()
|
|
return True
|
|
return False
|
|
|
|
def delete_session(self, hash_id: str):
|
|
if hash_id in self.session:
|
|
del self.session[hash_id]
|
|
return True
|
|
return False
|
|
|
|
SESSION = SessionProcess()
|
|
app = FastAPI(title="Hot Topic")
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
NUM_OF_THREAD = 2
|
|
QQ = Queue(maxsize=0)
|
|
|
|
def process_wc():
|
|
print('Run thr')
|
|
global SESSION, QQ
|
|
while True:
|
|
if not QQ.empty():
|
|
hash_id = QQ.get()
|
|
SESSION.update_session(hash_id, {}, 0)
|
|
print("update trạng thái status = 0: đang xử lý")
|
|
try:
|
|
ss_info = SESSION.get_info_session(hash_id)
|
|
status = ss_info["status"]
|
|
print("trạng thái hiện tại: ", status)
|
|
if status == 0:
|
|
data_input = SESSION.session[hash_id]["data"]
|
|
res_doc = get_data_sorl(data_input.query, data_input.keywords, data_input.start_time, data_input.end_time, max_posts = data_input.max_posts)
|
|
print('lenght res_doc: ', len(res_doc))
|
|
if not res_doc:
|
|
SESSION.update_session(hash_id, {}, -1)
|
|
else:
|
|
|
|
current_time = datetime.now()
|
|
time_now = current_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
d = {
|
|
"id_topic": "99999",
|
|
"start_time": time_now,
|
|
"end_time": data_input.end_time,
|
|
"threshold": 0.3,
|
|
"top_sentence": -1,
|
|
"top_cluster": data_input.top_cluster,
|
|
"topn_summary": 10,
|
|
"type_cluster": "",
|
|
"lang_process": "",
|
|
"prompt": data_input.prompt,
|
|
"topic_name": data_input.check_relevent,
|
|
"responseHeader": {},
|
|
"benchmark_topics": [],
|
|
"response": {"docs": res_doc}
|
|
}
|
|
|
|
str_hash = ""
|
|
str_hash += "99999"
|
|
str_hash += time_now
|
|
hash_id_path = hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
|
|
|
|
st_time = time.time()
|
|
try:
|
|
response = requests.post('http://10.9.3.241:8636/newsanalysis/topic_clustering', json=d, timeout=5)
|
|
except:
|
|
print("Timeout done")
|
|
|
|
print("push done msg")
|
|
res_clus = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("update done msg")
|
|
SESSION.update_session(hash_id_path, res_clus, 1)
|
|
except Exception as ve_:
|
|
print(ve_)
|
|
SESSION.update_session(hash_id_path, {}, -1)
|
|
raise ve_
|
|
else:
|
|
time.sleep(2)
|
|
|
|
|
|
for _ in range(NUM_OF_THREAD):
|
|
worker = Thread(target=process_wc, args=())
|
|
worker.setDaemon(True)
|
|
worker.start()
|
|
|
|
|
|
@app.post("/api/v1/send_message")
|
|
def send_requests(item: InputHotTopic):
|
|
global SESSION
|
|
hash_id = SESSION.insert_session(item)
|
|
if SESSION.session[hash_id]["status"] == 0:
|
|
QQ.put(hash_id)
|
|
|
|
return ResponseQueue(statusCode=1, message="Push to queue done !", result={"hash_id": hash_id})
|
|
|
|
class InputSession(BaseModel):
|
|
hash_id: str = ""
|
|
|
|
class Response(BaseModel):
|
|
statusCode: int = 200
|
|
message: str = ""
|
|
result: dict = {}
|
|
@app.post("/api/mining/qna/result")
|
|
def get_result(item: InputSession):
|
|
global SESSION
|
|
res = SESSION.get_info_session(item.hash_id)
|
|
status = res["status"]
|
|
res = res["result"]
|
|
if status == -1:
|
|
msg = "ERROR"
|
|
elif status == 0:
|
|
msg = "processing ..."
|
|
elif status == 1:
|
|
msg = "done"
|
|
|
|
else:
|
|
msg = "nothing"
|
|
return Response(statusCode=status, message=msg, result=res) |