fixed pipe
Browse files- lightrag/lightrag.py +21 -23
lightrag/lightrag.py
CHANGED
@@ -586,7 +586,7 @@ class LightRAG:
|
|
586 |
if update_storage:
|
587 |
await self._insert_done()
|
588 |
|
589 |
-
async def apipeline_process_documents(self, string_or_strings):
|
590 |
"""Input list remove duplicates, generate document IDs and initial pendding status, filter out already stored documents, store docs
|
591 |
Args:
|
592 |
string_or_strings: Single document string or list of document strings
|
@@ -628,20 +628,24 @@ class LightRAG:
|
|
628 |
|
629 |
# 4. Store original document
|
630 |
for doc_id, doc in new_docs.items():
|
631 |
-
await self.full_docs.upsert(
|
632 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
633 |
logger.info(f"Stored {len(new_docs)} new unique documents")
|
634 |
|
635 |
async def apipeline_process_chunks(self):
|
636 |
"""Get pendding documents, split into chunks,insert chunks"""
|
637 |
# 1. get all pending and failed documents
|
638 |
_todo_doc_keys = []
|
639 |
-
|
640 |
-
|
641 |
-
)
|
642 |
-
|
643 |
-
status=DocStatus.PENDING, ids=None
|
644 |
-
)
|
645 |
if _failed_doc:
|
646 |
_todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
|
647 |
if _pendding_doc:
|
@@ -671,7 +675,7 @@ class LightRAG:
|
|
671 |
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
672 |
**dp,
|
673 |
"full_doc_id": doc_id,
|
674 |
-
"status": DocStatus.
|
675 |
}
|
676 |
for dp in chunking_by_token_size(
|
677 |
doc["content"],
|
@@ -681,17 +685,15 @@ class LightRAG:
|
|
681 |
)
|
682 |
}
|
683 |
chunk_cnt += len(chunks)
|
684 |
-
|
685 |
-
await self.text_chunks.change_status(doc_id, DocStatus.PROCESSING)
|
686 |
-
|
687 |
try:
|
688 |
# Store chunks in vector database
|
689 |
await self.chunks_vdb.upsert(chunks)
|
690 |
# Update doc status
|
691 |
-
await self.
|
692 |
except Exception as e:
|
693 |
# Mark as failed if any step fails
|
694 |
-
await self.
|
695 |
raise e
|
696 |
except Exception as e:
|
697 |
import traceback
|
@@ -705,12 +707,8 @@ class LightRAG:
|
|
705 |
"""Get pendding or failed chunks, extract entities and relationships from each chunk"""
|
706 |
# 1. get all pending and failed chunks
|
707 |
_todo_chunk_keys = []
|
708 |
-
_failed_chunks = await self.text_chunks.get_by_status_and_ids(
|
709 |
-
|
710 |
-
)
|
711 |
-
_pendding_chunks = await self.text_chunks.get_by_status_and_ids(
|
712 |
-
status=DocStatus.PENDING, ids=None
|
713 |
-
)
|
714 |
if _failed_chunks:
|
715 |
_todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
|
716 |
if _pendding_chunks:
|
@@ -744,11 +742,11 @@ class LightRAG:
|
|
744 |
if maybe_new_kg is None:
|
745 |
logger.info("No entities or relationships extracted!")
|
746 |
# Update status to processed
|
747 |
-
await self.text_chunks.
|
748 |
except Exception as e:
|
749 |
logger.error("Failed to extract entities and relationships")
|
750 |
# Mark as failed if any step fails
|
751 |
-
await self.text_chunks.
|
752 |
raise e
|
753 |
|
754 |
with tqdm_async(
|
|
|
586 |
if update_storage:
|
587 |
await self._insert_done()
|
588 |
|
589 |
+
async def apipeline_process_documents(self, string_or_strings: str | list[str]):
|
590 |
"""Input list remove duplicates, generate document IDs and initial pendding status, filter out already stored documents, store docs
|
591 |
Args:
|
592 |
string_or_strings: Single document string or list of document strings
|
|
|
628 |
|
629 |
# 4. Store original document
|
630 |
for doc_id, doc in new_docs.items():
|
631 |
+
await self.full_docs.upsert(
|
632 |
+
{
|
633 |
+
doc_id: {
|
634 |
+
"content": doc["content"],
|
635 |
+
"status": DocStatus.PENDING
|
636 |
+
}
|
637 |
+
}
|
638 |
+
)
|
639 |
logger.info(f"Stored {len(new_docs)} new unique documents")
|
640 |
|
641 |
async def apipeline_process_chunks(self):
|
642 |
"""Get pendding documents, split into chunks,insert chunks"""
|
643 |
# 1. get all pending and failed documents
|
644 |
_todo_doc_keys = []
|
645 |
+
|
646 |
+
_failed_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.FAILED)
|
647 |
+
_pendding_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.PENDING)
|
648 |
+
|
|
|
|
|
649 |
if _failed_doc:
|
650 |
_todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
|
651 |
if _pendding_doc:
|
|
|
675 |
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
676 |
**dp,
|
677 |
"full_doc_id": doc_id,
|
678 |
+
"status": DocStatus.PROCESSED,
|
679 |
}
|
680 |
for dp in chunking_by_token_size(
|
681 |
doc["content"],
|
|
|
685 |
)
|
686 |
}
|
687 |
chunk_cnt += len(chunks)
|
688 |
+
|
|
|
|
|
689 |
try:
|
690 |
# Store chunks in vector database
|
691 |
await self.chunks_vdb.upsert(chunks)
|
692 |
# Update doc status
|
693 |
+
await self.text_chunks.upsert({**chunks, "status": DocStatus.PENDING})
|
694 |
except Exception as e:
|
695 |
# Mark as failed if any step fails
|
696 |
+
await self.text_chunks.upsert({**chunks, "status": DocStatus.FAILED})
|
697 |
raise e
|
698 |
except Exception as e:
|
699 |
import traceback
|
|
|
707 |
"""Get pendding or failed chunks, extract entities and relationships from each chunk"""
|
708 |
# 1. get all pending and failed chunks
|
709 |
_todo_chunk_keys = []
|
710 |
+
_failed_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.FAILED)
|
711 |
+
_pendding_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.PENDING)
|
|
|
|
|
|
|
|
|
712 |
if _failed_chunks:
|
713 |
_todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
|
714 |
if _pendding_chunks:
|
|
|
742 |
if maybe_new_kg is None:
|
743 |
logger.info("No entities or relationships extracted!")
|
744 |
# Update status to processed
|
745 |
+
await self.text_chunks.upsert({chunk_id: {"status": DocStatus.PROCESSED}})
|
746 |
except Exception as e:
|
747 |
logger.error("Failed to extract entities and relationships")
|
748 |
# Mark as failed if any step fails
|
749 |
+
await self.text_chunks.upsert({chunk_id: {"status": DocStatus.FAILED}})
|
750 |
raise e
|
751 |
|
752 |
with tqdm_async(
|