yangdx commited on
Commit
6e653a1
·
1 Parent(s): a60affe

No need the await entity_relation_task first

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +29 -12
lightrag/lightrag.py CHANGED
@@ -860,18 +860,28 @@ class LightRAG:
860
  }
861
  )
862
  )
863
- chunks_vdb_task = asyncio.create_task(self.chunks_vdb.upsert(chunks))
864
- entity_relation_task = asyncio.create_task(self._process_entity_relation_graph(chunks))
 
 
 
 
865
  full_docs_task = asyncio.create_task(
866
- self.full_docs.upsert({doc_id: {"content": status_doc.content}})
 
 
867
  )
868
- text_chunks_task = asyncio.create_task(self.text_chunks.upsert(chunks))
869
-
870
- tasks = [doc_status_task, chunks_vdb_task, entity_relation_task, full_docs_task, text_chunks_task]
 
 
 
 
 
 
 
871
  try:
872
- # Wait for entity_relation_task first as it's critical
873
- await entity_relation_task
874
- # If successful, wait for other tasks
875
  await asyncio.gather(*tasks)
876
  await self.doc_status.upsert(
877
  {
@@ -888,13 +898,20 @@ class LightRAG:
888
  )
889
  except Exception as e:
890
  # Log error and update pipeline status
891
- error_msg = f"Failed to process document {doc_id}: {str(e)}"
 
 
892
  logger.error(error_msg)
893
  pipeline_status["latest_message"] = error_msg
894
  pipeline_status["history_messages"].append(error_msg)
895
 
896
  # Cancel other tasks as they are no longer meaningful
897
- for task in [chunks_vdb_task, full_docs_task, text_chunks_task]:
 
 
 
 
 
898
  if not task.done():
899
  task.cancel()
900
 
@@ -941,7 +958,7 @@ class LightRAG:
941
  pipeline_status["latest_message"] = log_message
942
  pipeline_status["history_messages"].append(log_message)
943
 
944
- # 获取新的待处理文档
945
  processing_docs, failed_docs, pending_docs = await asyncio.gather(
946
  self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
947
  self.doc_status.get_docs_by_status(DocStatus.FAILED),
 
860
  }
861
  )
862
  )
863
+ chunks_vdb_task = asyncio.create_task(
864
+ self.chunks_vdb.upsert(chunks)
865
+ )
866
+ entity_relation_task = asyncio.create_task(
867
+ self._process_entity_relation_graph(chunks)
868
+ )
869
  full_docs_task = asyncio.create_task(
870
+ self.full_docs.upsert(
871
+ {doc_id: {"content": status_doc.content}}
872
+ )
873
  )
874
+ text_chunks_task = asyncio.create_task(
875
+ self.text_chunks.upsert(chunks)
876
+ )
877
+ tasks = [
878
+ doc_status_task,
879
+ chunks_vdb_task,
880
+ entity_relation_task,
881
+ full_docs_task,
882
+ text_chunks_task,
883
+ ]
884
  try:
 
 
 
885
  await asyncio.gather(*tasks)
886
  await self.doc_status.upsert(
887
  {
 
898
  )
899
  except Exception as e:
900
  # Log error and update pipeline status
901
+ error_msg = (
902
+ f"Failed to process document {doc_id}: {str(e)}"
903
+ )
904
  logger.error(error_msg)
905
  pipeline_status["latest_message"] = error_msg
906
  pipeline_status["history_messages"].append(error_msg)
907
 
908
  # Cancel other tasks as they are no longer meaningful
909
+ for task in [
910
+ chunks_vdb_task,
911
+ entity_relation_task,
912
+ full_docs_task,
913
+ text_chunks_task,
914
+ ]:
915
  if not task.done():
916
  task.cancel()
917
 
 
958
  pipeline_status["latest_message"] = log_message
959
  pipeline_status["history_messages"].append(log_message)
960
 
961
+ # Check for pending documents again
962
  processing_docs, failed_docs, pending_docs = await asyncio.gather(
963
  self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
964
  self.doc_status.get_docs_by_status(DocStatus.FAILED),