topic-clustering-global-dashboard / consumer_hot_topic_ondemand.py
cuongnguyen910's picture
Upload folder using huggingface_hub
5120311 verified
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from entity import InputHotTopic, ResponseQueue
from threading import Thread
from queue import Queue
import time
import json, requests
from service_cache_Thien import get_data_sorl
import os
from datetime import datetime
from email_validator import push_msg_tele
import time
import json
import hashlib
from pydantic import BaseModel
class InputHotTopic(BaseModel):
start_time: str = "2024-09-03 23:00:00"
end_time: str = "2024-09-05 23:00:00"
query: str = "Giá nhà chung cư trên Hà Nội"
keywords: list = ["chung cư, Hà Nội", "Hoà Lạc"]
top_cluster: int = 5
prompt: str = """Trong 300 từ, hãy tổng hợp thành một đoạn văn một cách đầy đủ, chi tiết, và trung thực về các chủ đề xung quanh biến động giá nhà chung cư Hà Nội từ nội dung dưới đây.
Nếu không có thông tin gì liên quan đến giá nhà chung cư Hà Nội trong nội dung cung cấp thì trả lời "không có thông tin". Không đưa quan điểm cá nhân, không lặp lại một phần câu hỏi, loại bỏ phần mở đầu. Không có những câu từ liên kết như: "Sau đây là nội dung tóm tắt", "Nội dung tóm tắt là", "Dưới đây là " ... """
check_relevent: str = "Hãy đánh giá nội dung dưới đây có thông tin liên quan đến giá cả nhà chung cư Hà Nội hay không? Chỉ trả lời có hoặc không, không đưa thêm thông tin không liên quan"
max_posts: int = 5000
def get_hash_id(item: InputHotTopic):
str_hash = ""
if item.id_topic:
str_hash += item.id_topic
str_hash += item.start_time
return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
else:
return ""
class SessionProcess(object):
def __init__(self):
self.session = dict()
def hash_session(self, query: InputHotTopic):
hash_dict = query.dict()
hash_dict['time'] = int(time.time())
return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest()
def insert_session(self, data_input):
print('data_input: ', data_input)
# if self.mode == "command_center":
# hash_id = hash_session(data_input)
# else:
hash_id = self.hash_session(data_input)
if hash_id not in self.session:
self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(),
"result": {}, "data": data_input}
return hash_id
def get_info_session(self, hash_id: str):
if hash_id in self.session:
return self.session[hash_id]
return {"status": -2, "result": {}, "meta": {}}
def update_session(self, hash_id: str, result: dict, status: int):
if hash_id in self.session:
self.session[hash_id]["status"] = status
self.session[hash_id]["result"] = result
self.session[hash_id]["update_time"] = time.time()
return True
return False
def delete_session(self, hash_id: str):
if hash_id in self.session:
del self.session[hash_id]
return True
return False
SESSION = SessionProcess()
app = FastAPI(title="Hot Topic")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
NUM_OF_THREAD = 2
QQ = Queue(maxsize=0) # don't limit queue
def process_wc():
print('Run thr')
global SESSION, QQ
while True:
if not QQ.empty():
hash_id = QQ.get()
SESSION.update_session(hash_id, {}, 0)
print("update trạng thái status = 0: đang xử lý")
try:
ss_info = SESSION.get_info_session(hash_id)
status = ss_info["status"]
print("trạng thái hiện tại: ", status)
if status == 0:
data_input = SESSION.session[hash_id]["data"]
res_doc = get_data_sorl(data_input.query, data_input.keywords, data_input.start_time, data_input.end_time, max_posts = data_input.max_posts)
print('lenght res_doc: ', len(res_doc))
if not res_doc:
SESSION.update_session(hash_id, {}, -1)
else:
# start_time: str = "2024-03-03 23:00:00"
current_time = datetime.now()
time_now = current_time.strftime("%Y-%m-%d %H:%M:%S")
d = {
"id_topic": "99999",
"start_time": time_now,
"end_time": data_input.end_time,
"threshold": 0.3,
"top_sentence": -1,
"top_cluster": data_input.top_cluster,
"topn_summary": 10,
"type_cluster": "",
"lang_process": "",
"prompt": data_input.prompt,
"topic_name": data_input.check_relevent,
"responseHeader": {},
"benchmark_topics": [],
"response": {"docs": res_doc}
}
str_hash = ""
str_hash += "99999"
str_hash += time_now
hash_id_path = hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
st_time = time.time()
try:
response = requests.post('http://10.9.3.241:8636/newsanalysis/topic_clustering', json=d, timeout=5)
except:
print("Timeout done")
print("push done msg")
res_clus = {}
# flag = False
# count = 0
# while not flag and count < 18000:
# if os.path.exists("/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)):
# path_res = "/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)
# with open(path_res, encoding="utf-8") as ff:
# res_clus = json.load(ff)
# res_clus["num_articles"] = len(res_doc)
# message = "Hello"
# push_msg_tele(data_input.bot_token , data_input.chat_id , message)
# print('done processing result')
# flag = True
# time.sleep(1)
# count +=1
# print('sleep: ', count)
print("update done msg")
SESSION.update_session(hash_id_path, res_clus, 1)
except Exception as ve_:
print(ve_)
SESSION.update_session(hash_id_path, {}, -1)
raise ve_
else:
time.sleep(2)
for _ in range(NUM_OF_THREAD):
worker = Thread(target=process_wc, args=())
worker.setDaemon(True)
worker.start()
@app.post("/api/v1/send_message")
def send_requests(item: InputHotTopic):
global SESSION
hash_id = SESSION.insert_session(item)
if SESSION.session[hash_id]["status"] == 0:
QQ.put(hash_id)
return ResponseQueue(statusCode=1, message="Push to queue done !", result={"hash_id": hash_id})
class InputSession(BaseModel):
hash_id: str = ""
class Response(BaseModel):
statusCode: int = 200
message: str = ""
result: dict = {}
@app.post("/api/mining/qna/result")
def get_result(item: InputSession):
global SESSION
res = SESSION.get_info_session(item.hash_id)
status = res["status"]
res = res["result"]
if status == -1:
msg = "ERROR"
elif status == 0:
msg = "processing ..."
elif status == 1:
msg = "done"
# SESSION.delete_session(item.hash_id)
else:
msg = "nothing"
return Response(statusCode=status, message=msg, result=res)