zhichyu commited on
Commit
aebd986
·
1 Parent(s): cf6b668

Added kb_id filter to knn. Fix #3458 (#3513)

Browse files

### What problem does this PR solve?

Added kb_id filter to knn. Fix #3458

- [x] Bug Fix (non-breaking change which fixes an issue)

deepdoc/parser/pdf_parser.py CHANGED
@@ -757,7 +757,7 @@ class RAGFlowPdfParser:
757
  if ii is not None:
758
  b = louts[ii]
759
  else:
760
- logging.warn(
761
  f"Missing layout match: {pn + 1},%s" %
762
  (bxs[0].get(
763
  "layoutno", "")))
 
757
  if ii is not None:
758
  b = louts[ii]
759
  else:
760
+ logging.warning(
761
  f"Missing layout match: {pn + 1},%s" %
762
  (bxs[0].get(
763
  "layoutno", "")))
rag/nlp/synonym.py CHANGED
@@ -33,7 +33,7 @@ class Dealer:
33
  try:
34
  self.dictionary = json.load(open(path, 'r'))
35
  except Exception:
36
- logging.warn("Missing synonym.json")
37
  self.dictionary = {}
38
 
39
  if not redis:
 
33
  try:
34
  self.dictionary = json.load(open(path, 'r'))
35
  except Exception:
36
+ logging.warning("Missing synonym.json")
37
  self.dictionary = {}
38
 
39
  if not redis:
rag/utils/es_conn.py CHANGED
@@ -35,7 +35,7 @@ class ESConnection(DocStoreConnection):
35
  self.info = self.es.info()
36
  break
37
  except Exception as e:
38
- logging.warn(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
39
  time.sleep(5)
40
  if not self.es.ping():
41
  msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
@@ -80,7 +80,7 @@ class ESConnection(DocStoreConnection):
80
  settings=self.mapping["settings"],
81
  mappings=self.mapping["mappings"])
82
  except Exception:
83
- logging.exception("ES create index error %s" % (indexName))
84
 
85
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
86
  try:
@@ -88,7 +88,7 @@ class ESConnection(DocStoreConnection):
88
  except NotFoundError:
89
  pass
90
  except Exception:
91
- logging.exception("ES delete index error %s" % (indexName))
92
 
93
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
94
  s = Index(indexName, self.es)
@@ -96,7 +96,7 @@ class ESConnection(DocStoreConnection):
96
  try:
97
  return s.exists()
98
  except Exception as e:
99
- logging.exception("ES indexExist")
100
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
101
  continue
102
  return False
@@ -115,8 +115,21 @@ class ESConnection(DocStoreConnection):
115
  indexNames = indexNames.split(",")
116
  assert isinstance(indexNames, list) and len(indexNames) > 0
117
  assert "_id" not in condition
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  s = Search()
119
- bqry = None
120
  vector_similarity_weight = 0.5
121
  for m in matchExprs:
122
  if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
@@ -130,13 +143,12 @@ class ESConnection(DocStoreConnection):
130
  minimum_should_match = "0%"
131
  if "minimum_should_match" in m.extra_options:
132
  minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
133
- bqry = Q("bool",
134
- must=Q("query_string", fields=m.fields,
135
  type="best_fields", query=m.matching_text,
136
  minimum_should_match=minimum_should_match,
137
- boost=1),
138
- boost=1.0 - vector_similarity_weight,
139
- )
140
  elif isinstance(m, MatchDenseExpr):
141
  assert (bqry is not None)
142
  similarity = 0.0
@@ -150,21 +162,6 @@ class ESConnection(DocStoreConnection):
150
  similarity=similarity,
151
  )
152
 
153
- condition["kb_id"] = knowledgebaseIds
154
- if condition:
155
- if not bqry:
156
- bqry = Q("bool", must=[])
157
- for k, v in condition.items():
158
- if not isinstance(k, str) or not v:
159
- continue
160
- if isinstance(v, list):
161
- bqry.filter.append(Q("terms", **{k: v}))
162
- elif isinstance(v, str) or isinstance(v, int):
163
- bqry.filter.append(Q("term", **{k: v}))
164
- else:
165
- raise Exception(
166
- f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.")
167
-
168
  if bqry:
169
  s = s.query(bqry)
170
  for field in highlightFields:
@@ -181,8 +178,7 @@ class ESConnection(DocStoreConnection):
181
  if limit > 0:
182
  s = s[offset:limit]
183
  q = s.to_dict()
184
- print(json.dumps(q), flush=True)
185
- logging.debug("ESConnection.search [Q]: " + json.dumps(q))
186
 
187
  for i in range(3):
188
  try:
@@ -194,15 +190,15 @@ class ESConnection(DocStoreConnection):
194
  _source=True)
195
  if str(res.get("timed_out", "")).lower() == "true":
196
  raise Exception("Es Timeout.")
197
- logging.debug("ESConnection.search res: " + str(res))
198
  return res
199
  except Exception as e:
200
- logging.exception("ES search [Q]: " + str(q))
201
  if str(e).find("Timeout") > 0:
202
  continue
203
  raise e
204
- logging.error("ES search timeout for 3 times!")
205
- raise Exception("ES search timeout.")
206
 
207
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
208
  for i in range(3):
@@ -217,12 +213,12 @@ class ESConnection(DocStoreConnection):
217
  chunk["id"] = chunkId
218
  return chunk
219
  except Exception as e:
220
- logging.exception(f"ES get({chunkId}) got exception")
221
  if str(e).find("Timeout") > 0:
222
  continue
223
  raise e
224
- logging.error("ES search timeout for 3 times!")
225
- raise Exception("ES search timeout.")
226
 
227
  def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
228
  # Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
@@ -250,7 +246,7 @@ class ESConnection(DocStoreConnection):
250
  res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
251
  return res
252
  except Exception as e:
253
- logging.warning("Fail to bulk: " + str(e))
254
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
255
  time.sleep(3)
256
  continue
@@ -268,7 +264,7 @@ class ESConnection(DocStoreConnection):
268
  return True
269
  except Exception as e:
270
  logging.exception(
271
- f"ES failed to update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)})")
272
  if str(e).find("Timeout") > 0:
273
  continue
274
  else:
@@ -307,7 +303,7 @@ class ESConnection(DocStoreConnection):
307
  _ = ubq.execute()
308
  return True
309
  except Exception as e:
310
- logging.error("ES update exception: " + str(e) + "[Q]:" + str(bqry.to_dict()))
311
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
312
  continue
313
  return False
@@ -329,7 +325,7 @@ class ESConnection(DocStoreConnection):
329
  qry.must.append(Q("term", **{k: v}))
330
  else:
331
  raise Exception("Condition value must be int, str or list.")
332
- logging.debug("ESConnection.delete [Q]: " + json.dumps(qry.to_dict()))
333
  for _ in range(10):
334
  try:
335
  res = self.es.delete_by_query(
@@ -338,7 +334,7 @@ class ESConnection(DocStoreConnection):
338
  refresh=True)
339
  return res["deleted"]
340
  except Exception as e:
341
- logging.warning("Fail to delete: " + str(filter) + str(e))
342
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
343
  time.sleep(3)
344
  continue
@@ -447,10 +443,10 @@ class ESConnection(DocStoreConnection):
447
  request_timeout="2s")
448
  return res
449
  except ConnectionTimeout:
450
- logging.exception("ESConnection.sql timeout [Q]: " + sql)
451
  continue
452
  except Exception:
453
- logging.exception("ESConnection.sql got exception [Q]: " + sql)
454
  return None
455
  logging.error("ESConnection.sql timeout for 3 times!")
456
  return None
 
35
  self.info = self.es.info()
36
  break
37
  except Exception as e:
38
+ logging.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
39
  time.sleep(5)
40
  if not self.es.ping():
41
  msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
 
80
  settings=self.mapping["settings"],
81
  mappings=self.mapping["mappings"])
82
  except Exception:
83
+ logging.exception("ESConnection.createIndex error %s" % (indexName))
84
 
85
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
86
  try:
 
88
  except NotFoundError:
89
  pass
90
  except Exception:
91
+ logging.exception("ESConnection.deleteIdx error %s" % (indexName))
92
 
93
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
94
  s = Index(indexName, self.es)
 
96
  try:
97
  return s.exists()
98
  except Exception as e:
99
+ logging.exception("ESConnection.indexExist got exception")
100
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
101
  continue
102
  return False
 
115
  indexNames = indexNames.split(",")
116
  assert isinstance(indexNames, list) and len(indexNames) > 0
117
  assert "_id" not in condition
118
+
119
+ bqry = Q("bool", must=[])
120
+ condition["kb_id"] = knowledgebaseIds
121
+ for k, v in condition.items():
122
+ if not isinstance(k, str) or not v:
123
+ continue
124
+ if isinstance(v, list):
125
+ bqry.filter.append(Q("terms", **{k: v}))
126
+ elif isinstance(v, str) or isinstance(v, int):
127
+ bqry.filter.append(Q("term", **{k: v}))
128
+ else:
129
+ raise Exception(
130
+ f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.")
131
+
132
  s = Search()
 
133
  vector_similarity_weight = 0.5
134
  for m in matchExprs:
135
  if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
 
143
  minimum_should_match = "0%"
144
  if "minimum_should_match" in m.extra_options:
145
  minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
146
+ bqry.must.append(Q("query_string", fields=m.fields,
 
147
  type="best_fields", query=m.matching_text,
148
  minimum_should_match=minimum_should_match,
149
+ boost=1))
150
+ bqry.boost = 1.0 - vector_similarity_weight
151
+
152
  elif isinstance(m, MatchDenseExpr):
153
  assert (bqry is not None)
154
  similarity = 0.0
 
162
  similarity=similarity,
163
  )
164
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
165
  if bqry:
166
  s = s.query(bqry)
167
  for field in highlightFields:
 
178
  if limit > 0:
179
  s = s[offset:limit]
180
  q = s.to_dict()
181
+ logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
 
182
 
183
  for i in range(3):
184
  try:
 
190
  _source=True)
191
  if str(res.get("timed_out", "")).lower() == "true":
192
  raise Exception("Es Timeout.")
193
+ logging.debug(f"ESConnection.search {str(indexNames)} res: " + str(res))
194
  return res
195
  except Exception as e:
196
+ logging.exception(f"ESConnection.search {str(indexNames)} query: " + str(q))
197
  if str(e).find("Timeout") > 0:
198
  continue
199
  raise e
200
+ logging.error("ESConnection.search timeout for 3 times!")
201
+ raise Exception("ESConnection.search timeout.")
202
 
203
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
204
  for i in range(3):
 
213
  chunk["id"] = chunkId
214
  return chunk
215
  except Exception as e:
216
+ logging.exception(f"ESConnection.get({chunkId}) got exception")
217
  if str(e).find("Timeout") > 0:
218
  continue
219
  raise e
220
+ logging.error("ESConnection.get timeout for 3 times!")
221
+ raise Exception("ESConnection.get timeout.")
222
 
223
  def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
224
  # Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
 
246
  res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
247
  return res
248
  except Exception as e:
249
+ logging.warning("ESConnection.insert got exception: " + str(e))
250
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
251
  time.sleep(3)
252
  continue
 
264
  return True
265
  except Exception as e:
266
  logging.exception(
267
+ f"ESConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception")
268
  if str(e).find("Timeout") > 0:
269
  continue
270
  else:
 
303
  _ = ubq.execute()
304
  return True
305
  except Exception as e:
306
+ logging.error("ESConnection.update got exception: " + str(e))
307
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
308
  continue
309
  return False
 
325
  qry.must.append(Q("term", **{k: v}))
326
  else:
327
  raise Exception("Condition value must be int, str or list.")
328
+ logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
329
  for _ in range(10):
330
  try:
331
  res = self.es.delete_by_query(
 
334
  refresh=True)
335
  return res["deleted"]
336
  except Exception as e:
337
+ logging.warning("ESConnection.delete got exception: " + str(e))
338
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
339
  time.sleep(3)
340
  continue
 
443
  request_timeout="2s")
444
  return res
445
  except ConnectionTimeout:
446
+ logging.exception("ESConnection.sql timeout")
447
  continue
448
  except Exception:
449
+ logging.exception("ESConnection.sql got exception")
450
  return None
451
  logging.error("ESConnection.sql timeout for 3 times!")
452
  return None