from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from entity import InputHotTopic, ResponseQueue from threading import Thread from queue import Queue import time import json, requests from service_cache_Thien import get_data_sorl import os from datetime import datetime from email_validator import push_msg_tele import time import json import hashlib from pydantic import BaseModel class InputHotTopic(BaseModel): start_time: str = "2024-09-03 23:00:00" end_time: str = "2024-09-05 23:00:00" query: str = "Giá nhà chung cư trên Hà Nội" keywords: list = ["chung cư, Hà Nội", "Hoà Lạc"] top_cluster: int = 5 prompt: str = """Trong 300 từ, hãy tổng hợp thành một đoạn văn một cách đầy đủ, chi tiết, và trung thực về các chủ đề xung quanh biến động giá nhà chung cư Hà Nội từ nội dung dưới đây. Nếu không có thông tin gì liên quan đến giá nhà chung cư Hà Nội trong nội dung cung cấp thì trả lời "không có thông tin". Không đưa quan điểm cá nhân, không lặp lại một phần câu hỏi, loại bỏ phần mở đầu. Không có những câu từ liên kết như: "Sau đây là nội dung tóm tắt", "Nội dung tóm tắt là", "Dưới đây là " ... """ check_relevent: str = "Hãy đánh giá nội dung dưới đây có thông tin liên quan đến giá cả nhà chung cư Hà Nội hay không? Chỉ trả lời có hoặc không, không đưa thêm thông tin không liên quan" max_posts: int = 5000 def get_hash_id(item: InputHotTopic): str_hash = "" if item.id_topic: str_hash += item.id_topic str_hash += item.start_time return hashlib.sha224(str_hash.encode("utf-8")).hexdigest() else: return "" class SessionProcess(object): def __init__(self): self.session = dict() def hash_session(self, query: InputHotTopic): hash_dict = query.dict() hash_dict['time'] = int(time.time()) return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest() def insert_session(self, data_input): print('data_input: ', data_input) # if self.mode == "command_center": # hash_id = hash_session(data_input) # else: hash_id = self.hash_session(data_input) if hash_id not in self.session: self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(), "result": {}, "data": data_input} return hash_id def get_info_session(self, hash_id: str): if hash_id in self.session: return self.session[hash_id] return {"status": -2, "result": {}, "meta": {}} def update_session(self, hash_id: str, result: dict, status: int): if hash_id in self.session: self.session[hash_id]["status"] = status self.session[hash_id]["result"] = result self.session[hash_id]["update_time"] = time.time() return True return False def delete_session(self, hash_id: str): if hash_id in self.session: del self.session[hash_id] return True return False SESSION = SessionProcess() app = FastAPI(title="Hot Topic") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) NUM_OF_THREAD = 2 QQ = Queue(maxsize=0) # don't limit queue def process_wc(): print('Run thr') global SESSION, QQ while True: if not QQ.empty(): hash_id = QQ.get() SESSION.update_session(hash_id, {}, 0) print("update trạng thái status = 0: đang xử lý") try: ss_info = SESSION.get_info_session(hash_id) status = ss_info["status"] print("trạng thái hiện tại: ", status) if status == 0: data_input = SESSION.session[hash_id]["data"] res_doc = get_data_sorl(data_input.query, data_input.keywords, data_input.start_time, data_input.end_time, max_posts = data_input.max_posts) print('lenght res_doc: ', len(res_doc)) if not res_doc: SESSION.update_session(hash_id, {}, -1) else: # start_time: str = "2024-03-03 23:00:00" current_time = datetime.now() time_now = current_time.strftime("%Y-%m-%d %H:%M:%S") d = { "id_topic": "99999", "start_time": time_now, "end_time": data_input.end_time, "threshold": 0.3, "top_sentence": -1, "top_cluster": data_input.top_cluster, "topn_summary": 10, "type_cluster": "", "lang_process": "", "prompt": data_input.prompt, "topic_name": data_input.check_relevent, "responseHeader": {}, "benchmark_topics": [], "response": {"docs": res_doc} } str_hash = "" str_hash += "99999" str_hash += time_now hash_id_path = hashlib.sha224(str_hash.encode("utf-8")).hexdigest() st_time = time.time() try: response = requests.post('http://10.9.3.241:8636/newsanalysis/topic_clustering', json=d, timeout=5) except: print("Timeout done") print("push done msg") res_clus = {} # flag = False # count = 0 # while not flag and count < 18000: # if os.path.exists("/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)): # path_res = "/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path) # with open(path_res, encoding="utf-8") as ff: # res_clus = json.load(ff) # res_clus["num_articles"] = len(res_doc) # message = "Hello" # push_msg_tele(data_input.bot_token , data_input.chat_id , message) # print('done processing result') # flag = True # time.sleep(1) # count +=1 # print('sleep: ', count) print("update done msg") SESSION.update_session(hash_id_path, res_clus, 1) except Exception as ve_: print(ve_) SESSION.update_session(hash_id_path, {}, -1) raise ve_ else: time.sleep(2) for _ in range(NUM_OF_THREAD): worker = Thread(target=process_wc, args=()) worker.setDaemon(True) worker.start() @app.post("/api/v1/send_message") def send_requests(item: InputHotTopic): global SESSION hash_id = SESSION.insert_session(item) if SESSION.session[hash_id]["status"] == 0: QQ.put(hash_id) return ResponseQueue(statusCode=1, message="Push to queue done !", result={"hash_id": hash_id}) class InputSession(BaseModel): hash_id: str = "" class Response(BaseModel): statusCode: int = 200 message: str = "" result: dict = {} @app.post("/api/mining/qna/result") def get_result(item: InputSession): global SESSION res = SESSION.get_info_session(item.hash_id) status = res["status"] res = res["result"] if status == -1: msg = "ERROR" elif status == 0: msg = "processing ..." elif status == 1: msg = "done" # SESSION.delete_session(item.hash_id) else: msg = "nothing" return Response(statusCode=status, message=msg, result=res)