Kevin Hu commited on
Commit
dc07a4b
·
1 Parent(s): e2bbb9d

Cut down the attempt times of ES (#3550)

Browse files

### What problem does this PR solve?

#3541
### Type of change


- [x] Refactoring
- [x] Performance Improvement

Files changed (1) hide show
  1. rag/utils/es_conn.py +13 -11
rag/utils/es_conn.py CHANGED
@@ -16,13 +16,15 @@ from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr,
16
  FusionExpr
17
  from rag.nlp import is_english, rag_tokenizer
18
 
 
 
19
 
20
  @singleton
21
  class ESConnection(DocStoreConnection):
22
  def __init__(self):
23
  self.info = {}
24
  logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
25
- for _ in range(24):
26
  try:
27
  self.es = Elasticsearch(
28
  settings.ES["hosts"].split(","),
@@ -92,7 +94,7 @@ class ESConnection(DocStoreConnection):
92
 
93
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
94
  s = Index(indexName, self.es)
95
- for i in range(3):
96
  try:
97
  return s.exists()
98
  except Exception as e:
@@ -144,9 +146,9 @@ class ESConnection(DocStoreConnection):
144
  if "minimum_should_match" in m.extra_options:
145
  minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
146
  bqry.must.append(Q("query_string", fields=m.fields,
147
- type="best_fields", query=m.matching_text,
148
- minimum_should_match=minimum_should_match,
149
- boost=1))
150
  bqry.boost = 1.0 - vector_similarity_weight
151
 
152
  elif isinstance(m, MatchDenseExpr):
@@ -180,7 +182,7 @@ class ESConnection(DocStoreConnection):
180
  q = s.to_dict()
181
  logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
182
 
183
- for i in range(3):
184
  try:
185
  res = self.es.search(index=indexNames,
186
  body=q,
@@ -201,7 +203,7 @@ class ESConnection(DocStoreConnection):
201
  raise Exception("ESConnection.search timeout.")
202
 
203
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
204
- for i in range(3):
205
  try:
206
  res = self.es.get(index=(indexName),
207
  id=chunkId, source=True, )
@@ -233,7 +235,7 @@ class ESConnection(DocStoreConnection):
233
  operations.append(d_copy)
234
 
235
  res = []
236
- for _ in range(100):
237
  try:
238
  r = self.es.bulk(index=(indexName), operations=operations,
239
  refresh=False, timeout="600s")
@@ -258,7 +260,7 @@ class ESConnection(DocStoreConnection):
258
  if "id" in condition and isinstance(condition["id"], str):
259
  # update specific single document
260
  chunkId = condition["id"]
261
- for i in range(3):
262
  try:
263
  self.es.update(index=indexName, id=chunkId, doc=doc)
264
  return True
@@ -326,7 +328,7 @@ class ESConnection(DocStoreConnection):
326
  else:
327
  raise Exception("Condition value must be int, str or list.")
328
  logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
329
- for _ in range(10):
330
  try:
331
  res = self.es.delete_by_query(
332
  index=indexName,
@@ -437,7 +439,7 @@ class ESConnection(DocStoreConnection):
437
  sql = sql.replace(p, r, 1)
438
  logging.debug(f"ESConnection.sql to es: {sql}")
439
 
440
- for i in range(3):
441
  try:
442
  res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format,
443
  request_timeout="2s")
 
16
  FusionExpr
17
  from rag.nlp import is_english, rag_tokenizer
18
 
19
+ ATTEMPT_TIME = 2
20
+
21
 
22
  @singleton
23
  class ESConnection(DocStoreConnection):
24
  def __init__(self):
25
  self.info = {}
26
  logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
27
+ for _ in range(ATTEMPT_TIME):
28
  try:
29
  self.es = Elasticsearch(
30
  settings.ES["hosts"].split(","),
 
94
 
95
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
96
  s = Index(indexName, self.es)
97
+ for i in range(ATTEMPT_TIME):
98
  try:
99
  return s.exists()
100
  except Exception as e:
 
146
  if "minimum_should_match" in m.extra_options:
147
  minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
148
  bqry.must.append(Q("query_string", fields=m.fields,
149
+ type="best_fields", query=m.matching_text,
150
+ minimum_should_match=minimum_should_match,
151
+ boost=1))
152
  bqry.boost = 1.0 - vector_similarity_weight
153
 
154
  elif isinstance(m, MatchDenseExpr):
 
182
  q = s.to_dict()
183
  logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
184
 
185
+ for i in range(ATTEMPT_TIME):
186
  try:
187
  res = self.es.search(index=indexNames,
188
  body=q,
 
203
  raise Exception("ESConnection.search timeout.")
204
 
205
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
206
+ for i in range(ATTEMPT_TIME):
207
  try:
208
  res = self.es.get(index=(indexName),
209
  id=chunkId, source=True, )
 
235
  operations.append(d_copy)
236
 
237
  res = []
238
+ for _ in range(ATTEMPT_TIME):
239
  try:
240
  r = self.es.bulk(index=(indexName), operations=operations,
241
  refresh=False, timeout="600s")
 
260
  if "id" in condition and isinstance(condition["id"], str):
261
  # update specific single document
262
  chunkId = condition["id"]
263
+ for i in range(ATTEMPT_TIME):
264
  try:
265
  self.es.update(index=indexName, id=chunkId, doc=doc)
266
  return True
 
328
  else:
329
  raise Exception("Condition value must be int, str or list.")
330
  logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
331
+ for _ in range(ATTEMPT_TIME):
332
  try:
333
  res = self.es.delete_by_query(
334
  index=indexName,
 
439
  sql = sql.replace(p, r, 1)
440
  logging.debug(f"ESConnection.sql to es: {sql}")
441
 
442
+ for i in range(ATTEMPT_TIME):
443
  try:
444
  res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format,
445
  request_timeout="2s")