ragflow / python /nlp /synonym.py
KevinHuSh
init python part (#7)
f4456af
raw
history blame
1.76 kB
import json
import time
import logging
import re
class Dealer:
def __init__(self, redis=None):
self.lookup_num = 100000000
self.load_tm = time.time() - 1000000
self.dictionary = None
try:
self.dictionary = json.load(open("./synonym.json", 'r'))
except Exception as e:
pass
try:
self.dictionary = json.load(open("./res/synonym.json", 'r'))
except Exception as e:
try:
self.dictionary = json.load(open("../res/synonym.json", 'r'))
except Exception as e:
logging.warn("Miss synonym.json")
self.dictionary = {}
if not redis:
logging.warning(
"Realtime synonym is disabled, since no redis connection.")
if not len(self.dictionary.keys()):
logging.warning(f"Fail to load synonym")
self.redis = redis
self.load()
def load(self):
if not self.redis:
return
if self.lookup_num < 100:
return
tm = time.time()
if tm - self.load_tm < 3600:
return
self.load_tm = time.time()
self.lookup_num = 0
d = self.redis.get("kevin_synonyms")
if not d:
return
try:
d = json.loads(d)
self.dictionary = d
except Exception as e:
logging.error("Fail to load synonym!" + str(e))
def lookup(self, tk):
self.lookup_num += 1
self.load()
res = self.dictionary.get(re.sub(r"[ \t]+", " ", tk.lower()), [])
if isinstance(res, str):
res = [res]
return res
if __name__ == '__main__':
dl = Dealer()
print(dl.dictionary)