cleaned code
Browse files- lightrag/kg/postgres_impl.py +4 -2
- lightrag/lightrag.py +3 -3
lightrag/kg/postgres_impl.py
CHANGED
|
@@ -443,6 +443,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|
| 443 |
return {}
|
| 444 |
else:
|
| 445 |
return DocProcessingStatus(
|
|
|
|
| 446 |
content_length=result[0]["content_length"],
|
| 447 |
content_summary=result[0]["content_summary"],
|
| 448 |
status=result[0]["status"],
|
|
@@ -471,10 +472,9 @@ class PGDocStatusStorage(DocStatusStorage):
|
|
| 471 |
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$1"
|
| 472 |
params = {"workspace": self.db.workspace, "status": status}
|
| 473 |
result = await self.db.query(sql, params, True)
|
| 474 |
-
# Result is like [{'id': 'id1', 'status': 'PENDING', 'updated_at': '2023-07-01 00:00:00'}, {'id': 'id2', 'status': 'PENDING', 'updated_at': '2023-07-01 00:00:00'}, ...]
|
| 475 |
-
# Converting to be a dict
|
| 476 |
return {
|
| 477 |
element["id"]: DocProcessingStatus(
|
|
|
|
| 478 |
content_summary=element["content_summary"],
|
| 479 |
content_length=element["content_length"],
|
| 480 |
status=element["status"],
|
|
@@ -506,6 +506,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|
| 506 |
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status)
|
| 507 |
values($1,$2,$3,$4,$5,$6)
|
| 508 |
on conflict(id,workspace) do update set
|
|
|
|
| 509 |
content_summary = EXCLUDED.content_summary,
|
| 510 |
content_length = EXCLUDED.content_length,
|
| 511 |
chunks_count = EXCLUDED.chunks_count,
|
|
@@ -518,6 +519,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|
| 518 |
{
|
| 519 |
"workspace": self.db.workspace,
|
| 520 |
"id": k,
|
|
|
|
| 521 |
"content_summary": v["content_summary"],
|
| 522 |
"content_length": v["content_length"],
|
| 523 |
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
|
|
|
|
| 443 |
return {}
|
| 444 |
else:
|
| 445 |
return DocProcessingStatus(
|
| 446 |
+
content=result[0]["content"],
|
| 447 |
content_length=result[0]["content_length"],
|
| 448 |
content_summary=result[0]["content_summary"],
|
| 449 |
status=result[0]["status"],
|
|
|
|
| 472 |
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$1"
|
| 473 |
params = {"workspace": self.db.workspace, "status": status}
|
| 474 |
result = await self.db.query(sql, params, True)
|
|
|
|
|
|
|
| 475 |
return {
|
| 476 |
element["id"]: DocProcessingStatus(
|
| 477 |
+
content=result[0]["content"],
|
| 478 |
content_summary=element["content_summary"],
|
| 479 |
content_length=element["content_length"],
|
| 480 |
status=element["status"],
|
|
|
|
| 506 |
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status)
|
| 507 |
values($1,$2,$3,$4,$5,$6)
|
| 508 |
on conflict(id,workspace) do update set
|
| 509 |
+
content = EXCLUDED.content,
|
| 510 |
content_summary = EXCLUDED.content_summary,
|
| 511 |
content_length = EXCLUDED.content_length,
|
| 512 |
chunks_count = EXCLUDED.chunks_count,
|
|
|
|
| 519 |
{
|
| 520 |
"workspace": self.db.workspace,
|
| 521 |
"id": k,
|
| 522 |
+
"content": v["content"],
|
| 523 |
"content_summary": v["content_summary"],
|
| 524 |
"content_length": v["content_length"],
|
| 525 |
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
|
lightrag/lightrag.py
CHANGED
|
@@ -544,6 +544,7 @@ class LightRAG:
|
|
| 544 |
return
|
| 545 |
|
| 546 |
to_process_docs_ids = list(to_process_docs.keys())
|
|
|
|
| 547 |
# Get allready processed documents (text chunks and full docs)
|
| 548 |
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
|
| 549 |
to_process_docs_ids
|
|
@@ -570,9 +571,8 @@ class LightRAG:
|
|
| 570 |
ids_doc_processing_status,
|
| 571 |
desc=f"Process Batch {batch_idx}",
|
| 572 |
):
|
| 573 |
-
# Update status in processing
|
| 574 |
id_doc, status_doc = id_doc_processing_status
|
| 575 |
-
|
| 576 |
await self.doc_status.upsert(
|
| 577 |
{
|
| 578 |
id_doc: {
|
|
@@ -601,8 +601,8 @@ class LightRAG:
|
|
| 601 |
}
|
| 602 |
|
| 603 |
# Ensure chunk insertion and graph processing happen sequentially, not in parallel
|
| 604 |
-
await self._process_entity_relation_graph(chunks)
|
| 605 |
await self.chunks_vdb.upsert(chunks)
|
|
|
|
| 606 |
|
| 607 |
tasks[id_doc] = []
|
| 608 |
# Check if document already processed the doc
|
|
|
|
| 544 |
return
|
| 545 |
|
| 546 |
to_process_docs_ids = list(to_process_docs.keys())
|
| 547 |
+
|
| 548 |
# Get allready processed documents (text chunks and full docs)
|
| 549 |
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
|
| 550 |
to_process_docs_ids
|
|
|
|
| 571 |
ids_doc_processing_status,
|
| 572 |
desc=f"Process Batch {batch_idx}",
|
| 573 |
):
|
|
|
|
| 574 |
id_doc, status_doc = id_doc_processing_status
|
| 575 |
+
# Update status in processing
|
| 576 |
await self.doc_status.upsert(
|
| 577 |
{
|
| 578 |
id_doc: {
|
|
|
|
| 601 |
}
|
| 602 |
|
| 603 |
# Ensure chunk insertion and graph processing happen sequentially, not in parallel
|
|
|
|
| 604 |
await self.chunks_vdb.upsert(chunks)
|
| 605 |
+
await self._process_entity_relation_graph(chunks)
|
| 606 |
|
| 607 |
tasks[id_doc] = []
|
| 608 |
# Check if document already processed the doc
|