File size: 6,332 Bytes
5120311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
from entity import Docs, Cluster, Preprocess, SummaryInput
from fastapi import FastAPI
import time
import hashlib
import json
from fastapi.middleware.cors import CORSMiddleware
# from function import topic_clustering as tc
from function import topic_clustering_v2 as tc
from iclibs.ic_rabbit import ICRabbitMQ
from get_config import config_params
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_hash_id(item: Docs):
str_hash = ""
for it in item.response["docs"]:
str_hash += it["url"]
str_hash += str(item.top_cluster)
str_hash += str(item.top_sentence)
str_hash += str(item.topn_summary)
str_hash += str(item.top_doc)
str_hash += str(item.threshold)
if item.sorted_field.strip():
str_hash += str(item.sorted_field)
if item.delete_message:
str_hash += str(item.delete_message)
return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
try:
with open("log_run/log.txt") as f:
data_dict = json.load(f)
except Exception as ve:
print(ve)
data_dict = {}
@app.post("/newsanalysis/topic_clustering")
async def topic_clustering(item: Docs):
docs = item.response["docs"]
# threshold = item.threshold
print("start ")
print("len doc: ", len(docs))
st_time = time.time()
top_cluster = item.top_cluster
top_sentence = item.top_sentence
topn_summary = item.topn_summary
sorted_field = item.sorted_field
max_doc_per_cluster = item.max_doc_per_cluster
hash_str = get_hash_id(item)
# threshold = 0.1
# item.threshold = threshold
# with open("log/input_{0}.txt".format(st_time), "w+") as f:
# f.write(json.dumps({"docs": docs, "key": item.keyword}))
print(hash_str)
if len(docs) > 200:
try:
if hash_str in data_dict:
path_res = data_dict[hash_str]["response_path"]
with open(path_res) as ff:
results = json.load(ff)
print("time analysis (cache): ", time.time() - st_time)
return results
except Exception as vee:
print(vee)
results = tc.topic_clustering(docs, item.threshold, top_cluster=top_cluster, top_sentence=top_sentence,
topn_summary=topn_summary, sorted_field=sorted_field, max_doc_per_cluster=max_doc_per_cluster, delete_message=item.delete_message)
path_res = "log/result_{0}.txt".format(hash_str)
with open(path_res, "w+") as ff:
ff.write(json.dumps(results))
data_dict[hash_str] = {"time": st_time, "response_path": path_res}
lst_rm = []
for dt in data_dict:
if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600:
lst_rm.append(dt)
for dt in lst_rm:
del data_dict[dt]
with open("log_run/log.txt", "w+") as ff:
ff.write(json.dumps(data_dict))
print("time analysis: ", time.time() - st_time)
return results
def init_rabbit_queue(usr, passw, host, vir_host, queue_name, durable, max_priority, exchange=""):
connection = ICRabbitMQ(host, vir_host, usr, passw)
connection.init_connection()
channel = connection.init_queue(
queue_name, exchange=exchange, durable=durable, max_priority=max_priority)
return channel, connection, queue_name
@app.post("/newsanalysis/topic_clustering_2")
async def topic_clustering_v2(item: Docs):
docs = item.response["docs"]
meta = item.response.get('meta', {})
# threshold = item.threshold
print("start ")
print("len doc: ", len(docs))
st_time = time.time()
top_cluster = item.top_cluster
top_sentence = item.top_sentence
topn_summary = item.topn_summary
hash_str = get_hash_id(item)
# threshold = 0.1
# item.threshold = threshold
# with open("log/input_{0}.txt".format(st_time), "w+") as f:
# f.write(json.dumps({"docs": docs, "key": item.keyword}))
data_push = {
"docs": docs,
"threshold": item.threshold,
"top_cluster": top_cluster,
"top_sentence": top_sentence,
"topn_summary": topn_summary,
"hash_str": hash_str,
"st_time": st_time,
"meta": meta
}
params = config_params['queue_topic_clustering']
usr_name = params["usr_name"]
password = str(params["password"])
host = params["host"]
virtual_host = params["virtual_host"]
queue_name = params["queue_name"]
channel_consumer, rb_consumer, queue_consumer = init_rabbit_queue(usr_name, password, host, virtual_host, queue_name, True, 10)
ICRabbitMQ.publish_message(channel_consumer, queue_consumer, data_push, priority= 1,delivery_mode=2, exchange='')
return {"message":"success"}
@app.post("/newsanalysis/topic_clustering_with_preprocess")
async def topic_clustering_with_preprocess(item: Preprocess):
with open('preprocess.json','w') as f:
json.dump(item.__dict__,f,ensure_ascii=False)
data_push = {
"type": item.type,
"threshold": item.threshold,
"top_cluster": item.top_cluster,
"benchmark_id": item.benchmark_id,
"benchmark_children_id": item.benchmark_children_id,
"source_tagids": item.source_tagids,
"preprocess": item.preprocess,
"meta": item.meta
}
params = config_params['queue_merge_clustering']
usr_name = params["usr_name"]
password = str(params["password"])
host = params["host"]
virtual_host = params["virtual_host"]
queue_name = params["queue_name"]
channel_consumer, rb_consumer, queue_consumer = init_rabbit_queue(usr_name, password, host, virtual_host, queue_name, True, 10)
ICRabbitMQ.publish_message(channel_consumer, queue_consumer, data_push, priority= 1,delivery_mode=2, exchange='')
return {"message":"success"}
@app.post("/newsanalysis/summary")
async def summary(item: SummaryInput):
try:
summary_txt = tc.get_summary_bert(item.text, lang=None, topn=item.topn, ratio = 0.2)
except Exception as ex:
# raise ex
summary_txt = ''
return summary_txt
|