feat: Optimize document deletion performance
Browse files- To enhance performance during document deletion, new batch-get methods, `get_nodes_by_chunk_ids` and `get_edges_by_chunk_ids`, have been added to the graph storage layer (`BaseGraphStorage` and its implementations). The [`adelete_by_doc_id`](lightrag/lightrag.py:1681) function now leverages these methods to avoid unnecessary iteration over the entire knowledge graph, significantly improving efficiency.
- Graph storage updated: Networkx, Neo4j, Postgres AGE
- lightrag/base.py +62 -0
- lightrag/constants.py +3 -0
- lightrag/kg/neo4j_impl.py +42 -0
- lightrag/kg/networkx_impl.py +28 -0
- lightrag/kg/postgres_impl.py +97 -10
- lightrag/lightrag.py +39 -53
- lightrag/operate.py +2 -1
- lightrag/prompt.py +0 -1
lightrag/base.py
CHANGED
@@ -14,6 +14,7 @@ from typing import (
|
|
14 |
)
|
15 |
from .utils import EmbeddingFunc
|
16 |
from .types import KnowledgeGraph
|
|
|
17 |
|
18 |
# use the .env that is inside the current folder
|
19 |
# allows to use different .env file for each lightrag instance
|
@@ -456,6 +457,67 @@ class BaseGraphStorage(StorageNameSpace, ABC):
|
|
456 |
result[node_id] = edges if edges is not None else []
|
457 |
return result
|
458 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
459 |
@abstractmethod
|
460 |
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
|
461 |
"""Insert a new node or update an existing node in the graph.
|
|
|
14 |
)
|
15 |
from .utils import EmbeddingFunc
|
16 |
from .types import KnowledgeGraph
|
17 |
+
from .constants import GRAPH_FIELD_SEP
|
18 |
|
19 |
# use the .env that is inside the current folder
|
20 |
# allows to use different .env file for each lightrag instance
|
|
|
457 |
result[node_id] = edges if edges is not None else []
|
458 |
return result
|
459 |
|
460 |
+
@abstractmethod
|
461 |
+
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
462 |
+
"""Get all nodes that are associated with the given chunk_ids.
|
463 |
+
|
464 |
+
Args:
|
465 |
+
chunk_ids (list[str]): A list of chunk IDs to find associated nodes for.
|
466 |
+
|
467 |
+
Returns:
|
468 |
+
list[dict]: A list of nodes, where each node is a dictionary of its properties.
|
469 |
+
An empty list if no matching nodes are found.
|
470 |
+
"""
|
471 |
+
# Default implementation iterates through all nodes, which is inefficient.
|
472 |
+
# This method should be overridden by subclasses for better performance.
|
473 |
+
all_nodes = []
|
474 |
+
all_labels = await self.get_all_labels()
|
475 |
+
for label in all_labels:
|
476 |
+
node = await self.get_node(label)
|
477 |
+
if node and "source_id" in node:
|
478 |
+
source_ids = set(node["source_id"].split(GRAPH_FIELD_SEP))
|
479 |
+
if not source_ids.isdisjoint(chunk_ids):
|
480 |
+
all_nodes.append(node)
|
481 |
+
return all_nodes
|
482 |
+
|
483 |
+
@abstractmethod
|
484 |
+
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
485 |
+
"""Get all edges that are associated with the given chunk_ids.
|
486 |
+
|
487 |
+
Args:
|
488 |
+
chunk_ids (list[str]): A list of chunk IDs to find associated edges for.
|
489 |
+
|
490 |
+
Returns:
|
491 |
+
list[dict]: A list of edges, where each edge is a dictionary of its properties.
|
492 |
+
An empty list if no matching edges are found.
|
493 |
+
"""
|
494 |
+
# Default implementation iterates through all nodes and their edges, which is inefficient.
|
495 |
+
# This method should be overridden by subclasses for better performance.
|
496 |
+
all_edges = []
|
497 |
+
all_labels = await self.get_all_labels()
|
498 |
+
processed_edges = set()
|
499 |
+
|
500 |
+
for label in all_labels:
|
501 |
+
edges = await self.get_node_edges(label)
|
502 |
+
if edges:
|
503 |
+
for src_id, tgt_id in edges:
|
504 |
+
# Avoid processing the same edge twice in an undirected graph
|
505 |
+
edge_tuple = tuple(sorted((src_id, tgt_id)))
|
506 |
+
if edge_tuple in processed_edges:
|
507 |
+
continue
|
508 |
+
processed_edges.add(edge_tuple)
|
509 |
+
|
510 |
+
edge = await self.get_edge(src_id, tgt_id)
|
511 |
+
if edge and "source_id" in edge:
|
512 |
+
source_ids = set(edge["source_id"].split(GRAPH_FIELD_SEP))
|
513 |
+
if not source_ids.isdisjoint(chunk_ids):
|
514 |
+
# Add source and target to the edge dict for easier processing later
|
515 |
+
edge_with_nodes = edge.copy()
|
516 |
+
edge_with_nodes["source"] = src_id
|
517 |
+
edge_with_nodes["target"] = tgt_id
|
518 |
+
all_edges.append(edge_with_nodes)
|
519 |
+
return all_edges
|
520 |
+
|
521 |
@abstractmethod
|
522 |
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
|
523 |
"""Insert a new node or update an existing node in the graph.
|
lightrag/constants.py
CHANGED
@@ -12,6 +12,9 @@ DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 6
|
|
12 |
DEFAULT_WOKERS = 2
|
13 |
DEFAULT_TIMEOUT = 150
|
14 |
|
|
|
|
|
|
|
15 |
# Logging configuration defaults
|
16 |
DEFAULT_LOG_MAX_BYTES = 10485760 # Default 10MB
|
17 |
DEFAULT_LOG_BACKUP_COUNT = 5 # Default 5 backups
|
|
|
12 |
DEFAULT_WOKERS = 2
|
13 |
DEFAULT_TIMEOUT = 150
|
14 |
|
15 |
+
# Separator for graph fields
|
16 |
+
GRAPH_FIELD_SEP = "<SEP>"
|
17 |
+
|
18 |
# Logging configuration defaults
|
19 |
DEFAULT_LOG_MAX_BYTES = 10485760 # Default 10MB
|
20 |
DEFAULT_LOG_BACKUP_COUNT = 5 # Default 5 backups
|
lightrag/kg/neo4j_impl.py
CHANGED
@@ -16,6 +16,7 @@ import logging
|
|
16 |
from ..utils import logger
|
17 |
from ..base import BaseGraphStorage
|
18 |
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
|
|
19 |
import pipmaster as pm
|
20 |
|
21 |
if not pm.is_installed("neo4j"):
|
@@ -725,6 +726,47 @@ class Neo4JStorage(BaseGraphStorage):
|
|
725 |
await result.consume() # Ensure results are fully consumed
|
726 |
return edges_dict
|
727 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
728 |
@retry(
|
729 |
stop=stop_after_attempt(3),
|
730 |
wait=wait_exponential(multiplier=1, min=4, max=10),
|
|
|
16 |
from ..utils import logger
|
17 |
from ..base import BaseGraphStorage
|
18 |
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
19 |
+
from ..constants import GRAPH_FIELD_SEP
|
20 |
import pipmaster as pm
|
21 |
|
22 |
if not pm.is_installed("neo4j"):
|
|
|
726 |
await result.consume() # Ensure results are fully consumed
|
727 |
return edges_dict
|
728 |
|
729 |
+
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
730 |
+
async with self._driver.session(
|
731 |
+
database=self._DATABASE, default_access_mode="READ"
|
732 |
+
) as session:
|
733 |
+
query = """
|
734 |
+
UNWIND $chunk_ids AS chunk_id
|
735 |
+
MATCH (n:base)
|
736 |
+
WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep)
|
737 |
+
RETURN DISTINCT n
|
738 |
+
"""
|
739 |
+
result = await session.run(query, chunk_ids=chunk_ids, sep=GRAPH_FIELD_SEP)
|
740 |
+
nodes = []
|
741 |
+
async for record in result:
|
742 |
+
node = record["n"]
|
743 |
+
node_dict = dict(node)
|
744 |
+
# Add node id (entity_id) to the dictionary for easier access
|
745 |
+
node_dict["id"] = node_dict.get("entity_id")
|
746 |
+
nodes.append(node_dict)
|
747 |
+
await result.consume()
|
748 |
+
return nodes
|
749 |
+
|
750 |
+
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
751 |
+
async with self._driver.session(
|
752 |
+
database=self._DATABASE, default_access_mode="READ"
|
753 |
+
) as session:
|
754 |
+
query = """
|
755 |
+
UNWIND $chunk_ids AS chunk_id
|
756 |
+
MATCH (a:base)-[r]-(b:base)
|
757 |
+
WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep)
|
758 |
+
RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
|
759 |
+
"""
|
760 |
+
result = await session.run(query, chunk_ids=chunk_ids, sep=GRAPH_FIELD_SEP)
|
761 |
+
edges = []
|
762 |
+
async for record in result:
|
763 |
+
edge_properties = record["properties"]
|
764 |
+
edge_properties["source"] = record["source"]
|
765 |
+
edge_properties["target"] = record["target"]
|
766 |
+
edges.append(edge_properties)
|
767 |
+
await result.consume()
|
768 |
+
return edges
|
769 |
+
|
770 |
@retry(
|
771 |
stop=stop_after_attempt(3),
|
772 |
wait=wait_exponential(multiplier=1, min=4, max=10),
|
lightrag/kg/networkx_impl.py
CHANGED
@@ -5,6 +5,7 @@ from typing import final
|
|
5 |
from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
6 |
from lightrag.utils import logger
|
7 |
from lightrag.base import BaseGraphStorage
|
|
|
8 |
|
9 |
import pipmaster as pm
|
10 |
|
@@ -357,6 +358,33 @@ class NetworkXStorage(BaseGraphStorage):
|
|
357 |
)
|
358 |
return result
|
359 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
360 |
async def index_done_callback(self) -> bool:
|
361 |
"""Save data to disk"""
|
362 |
async with self._storage_lock:
|
|
|
5 |
from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
6 |
from lightrag.utils import logger
|
7 |
from lightrag.base import BaseGraphStorage
|
8 |
+
from lightrag.constants import GRAPH_FIELD_SEP
|
9 |
|
10 |
import pipmaster as pm
|
11 |
|
|
|
358 |
)
|
359 |
return result
|
360 |
|
361 |
+
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
362 |
+
chunk_ids_set = set(chunk_ids)
|
363 |
+
graph = await self._get_graph()
|
364 |
+
matching_nodes = []
|
365 |
+
for node_id, node_data in graph.nodes(data=True):
|
366 |
+
if "source_id" in node_data:
|
367 |
+
node_source_ids = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
368 |
+
if not node_source_ids.isdisjoint(chunk_ids_set):
|
369 |
+
node_data_with_id = node_data.copy()
|
370 |
+
node_data_with_id["id"] = node_id
|
371 |
+
matching_nodes.append(node_data_with_id)
|
372 |
+
return matching_nodes
|
373 |
+
|
374 |
+
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
375 |
+
chunk_ids_set = set(chunk_ids)
|
376 |
+
graph = await self._get_graph()
|
377 |
+
matching_edges = []
|
378 |
+
for u, v, edge_data in graph.edges(data=True):
|
379 |
+
if "source_id" in edge_data:
|
380 |
+
edge_source_ids = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
381 |
+
if not edge_source_ids.isdisjoint(chunk_ids_set):
|
382 |
+
edge_data_with_nodes = edge_data.copy()
|
383 |
+
edge_data_with_nodes["source"] = u
|
384 |
+
edge_data_with_nodes["target"] = v
|
385 |
+
matching_edges.append(edge_data_with_nodes)
|
386 |
+
return matching_edges
|
387 |
+
|
388 |
async def index_done_callback(self) -> bool:
|
389 |
"""Save data to disk"""
|
390 |
async with self._storage_lock:
|
lightrag/kg/postgres_impl.py
CHANGED
@@ -27,6 +27,7 @@ from ..base import (
|
|
27 |
)
|
28 |
from ..namespace import NameSpace, is_namespace
|
29 |
from ..utils import logger
|
|
|
30 |
|
31 |
import pipmaster as pm
|
32 |
|
@@ -1422,8 +1423,6 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1422 |
# Process string result, parse it to JSON dictionary
|
1423 |
if isinstance(node_dict, str):
|
1424 |
try:
|
1425 |
-
import json
|
1426 |
-
|
1427 |
node_dict = json.loads(node_dict)
|
1428 |
except json.JSONDecodeError:
|
1429 |
logger.warning(f"Failed to parse node string: {node_dict}")
|
@@ -1479,8 +1478,6 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1479 |
# Process string result, parse it to JSON dictionary
|
1480 |
if isinstance(result, str):
|
1481 |
try:
|
1482 |
-
import json
|
1483 |
-
|
1484 |
result = json.loads(result)
|
1485 |
except json.JSONDecodeError:
|
1486 |
logger.warning(f"Failed to parse edge string: {result}")
|
@@ -1697,8 +1694,6 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1697 |
# Process string result, parse it to JSON dictionary
|
1698 |
if isinstance(node_dict, str):
|
1699 |
try:
|
1700 |
-
import json
|
1701 |
-
|
1702 |
node_dict = json.loads(node_dict)
|
1703 |
except json.JSONDecodeError:
|
1704 |
logger.warning(
|
@@ -1861,8 +1856,6 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1861 |
# Process string result, parse it to JSON dictionary
|
1862 |
if isinstance(edge_props, str):
|
1863 |
try:
|
1864 |
-
import json
|
1865 |
-
|
1866 |
edge_props = json.loads(edge_props)
|
1867 |
except json.JSONDecodeError:
|
1868 |
logger.warning(
|
@@ -1879,8 +1872,6 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1879 |
# Process string result, parse it to JSON dictionary
|
1880 |
if isinstance(edge_props, str):
|
1881 |
try:
|
1882 |
-
import json
|
1883 |
-
|
1884 |
edge_props = json.loads(edge_props)
|
1885 |
except json.JSONDecodeError:
|
1886 |
logger.warning(
|
@@ -1975,6 +1966,102 @@ class PGGraphStorage(BaseGraphStorage):
|
|
1975 |
labels.append(result["label"])
|
1976 |
return labels
|
1977 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1978 |
async def _bfs_subgraph(
|
1979 |
self, node_label: str, max_depth: int, max_nodes: int
|
1980 |
) -> KnowledgeGraph:
|
|
|
27 |
)
|
28 |
from ..namespace import NameSpace, is_namespace
|
29 |
from ..utils import logger
|
30 |
+
from ..constants import GRAPH_FIELD_SEP
|
31 |
|
32 |
import pipmaster as pm
|
33 |
|
|
|
1423 |
# Process string result, parse it to JSON dictionary
|
1424 |
if isinstance(node_dict, str):
|
1425 |
try:
|
|
|
|
|
1426 |
node_dict = json.loads(node_dict)
|
1427 |
except json.JSONDecodeError:
|
1428 |
logger.warning(f"Failed to parse node string: {node_dict}")
|
|
|
1478 |
# Process string result, parse it to JSON dictionary
|
1479 |
if isinstance(result, str):
|
1480 |
try:
|
|
|
|
|
1481 |
result = json.loads(result)
|
1482 |
except json.JSONDecodeError:
|
1483 |
logger.warning(f"Failed to parse edge string: {result}")
|
|
|
1694 |
# Process string result, parse it to JSON dictionary
|
1695 |
if isinstance(node_dict, str):
|
1696 |
try:
|
|
|
|
|
1697 |
node_dict = json.loads(node_dict)
|
1698 |
except json.JSONDecodeError:
|
1699 |
logger.warning(
|
|
|
1856 |
# Process string result, parse it to JSON dictionary
|
1857 |
if isinstance(edge_props, str):
|
1858 |
try:
|
|
|
|
|
1859 |
edge_props = json.loads(edge_props)
|
1860 |
except json.JSONDecodeError:
|
1861 |
logger.warning(
|
|
|
1872 |
# Process string result, parse it to JSON dictionary
|
1873 |
if isinstance(edge_props, str):
|
1874 |
try:
|
|
|
|
|
1875 |
edge_props = json.loads(edge_props)
|
1876 |
except json.JSONDecodeError:
|
1877 |
logger.warning(
|
|
|
1966 |
labels.append(result["label"])
|
1967 |
return labels
|
1968 |
|
1969 |
+
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
1970 |
+
"""
|
1971 |
+
Retrieves nodes from the graph that are associated with a given list of chunk IDs.
|
1972 |
+
This method uses a Cypher query with UNWIND to efficiently find all nodes
|
1973 |
+
where the `source_id` property contains any of the specified chunk IDs.
|
1974 |
+
"""
|
1975 |
+
# The string representation of the list for the cypher query
|
1976 |
+
chunk_ids_str = json.dumps(chunk_ids)
|
1977 |
+
|
1978 |
+
query = f"""
|
1979 |
+
SELECT * FROM cypher('{self.graph_name}', $$
|
1980 |
+
UNWIND {chunk_ids_str} AS chunk_id
|
1981 |
+
MATCH (n:base)
|
1982 |
+
WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, '{GRAPH_FIELD_SEP}')
|
1983 |
+
RETURN n
|
1984 |
+
$$) AS (n agtype);
|
1985 |
+
"""
|
1986 |
+
results = await self._query(query)
|
1987 |
+
|
1988 |
+
# Build result list
|
1989 |
+
nodes = []
|
1990 |
+
for result in results:
|
1991 |
+
if result["n"]:
|
1992 |
+
node_dict = result["n"]["properties"]
|
1993 |
+
|
1994 |
+
# Process string result, parse it to JSON dictionary
|
1995 |
+
if isinstance(node_dict, str):
|
1996 |
+
try:
|
1997 |
+
node_dict = json.loads(node_dict)
|
1998 |
+
except json.JSONDecodeError:
|
1999 |
+
logger.warning(
|
2000 |
+
f"Failed to parse node string in batch: {node_dict}"
|
2001 |
+
)
|
2002 |
+
|
2003 |
+
node_dict["id"] = node_dict["entity_id"]
|
2004 |
+
nodes.append(node_dict)
|
2005 |
+
|
2006 |
+
return nodes
|
2007 |
+
|
2008 |
+
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
|
2009 |
+
"""
|
2010 |
+
Retrieves edges from the graph that are associated with a given list of chunk IDs.
|
2011 |
+
This method uses a Cypher query with UNWIND to efficiently find all edges
|
2012 |
+
where the `source_id` property contains any of the specified chunk IDs.
|
2013 |
+
"""
|
2014 |
+
chunk_ids_str = json.dumps(chunk_ids)
|
2015 |
+
|
2016 |
+
query = f"""
|
2017 |
+
SELECT * FROM cypher('{self.graph_name}', $$
|
2018 |
+
UNWIND {chunk_ids_str} AS chunk_id
|
2019 |
+
MATCH (a:base)-[r]-(b:base)
|
2020 |
+
WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, '{GRAPH_FIELD_SEP}')
|
2021 |
+
RETURN DISTINCT r, startNode(r) AS source, endNode(r) AS target
|
2022 |
+
$$) AS (edge agtype, source agtype, target agtype);
|
2023 |
+
"""
|
2024 |
+
results = await self._query(query)
|
2025 |
+
edges = []
|
2026 |
+
if results:
|
2027 |
+
for item in results:
|
2028 |
+
edge_agtype = item["edge"]["properties"]
|
2029 |
+
# Process string result, parse it to JSON dictionary
|
2030 |
+
if isinstance(edge_agtype, str):
|
2031 |
+
try:
|
2032 |
+
edge_agtype = json.loads(edge_agtype)
|
2033 |
+
except json.JSONDecodeError:
|
2034 |
+
logger.warning(
|
2035 |
+
f"Failed to parse edge string in batch: {edge_agtype}"
|
2036 |
+
)
|
2037 |
+
|
2038 |
+
source_agtype = item["source"]["properties"]
|
2039 |
+
# Process string result, parse it to JSON dictionary
|
2040 |
+
if isinstance(source_agtype, str):
|
2041 |
+
try:
|
2042 |
+
source_agtype = json.loads(source_agtype)
|
2043 |
+
except json.JSONDecodeError:
|
2044 |
+
logger.warning(
|
2045 |
+
f"Failed to parse node string in batch: {source_agtype}"
|
2046 |
+
)
|
2047 |
+
|
2048 |
+
target_agtype = item["target"]["properties"]
|
2049 |
+
# Process string result, parse it to JSON dictionary
|
2050 |
+
if isinstance(target_agtype, str):
|
2051 |
+
try:
|
2052 |
+
target_agtype = json.loads(target_agtype)
|
2053 |
+
except json.JSONDecodeError:
|
2054 |
+
logger.warning(
|
2055 |
+
f"Failed to parse node string in batch: {target_agtype}"
|
2056 |
+
)
|
2057 |
+
|
2058 |
+
if edge_agtype and source_agtype and target_agtype:
|
2059 |
+
edge_properties = edge_agtype
|
2060 |
+
edge_properties["source"] = source_agtype["entity_id"]
|
2061 |
+
edge_properties["target"] = target_agtype["entity_id"]
|
2062 |
+
edges.append(edge_properties)
|
2063 |
+
return edges
|
2064 |
+
|
2065 |
async def _bfs_subgraph(
|
2066 |
self, node_label: str, max_depth: int, max_nodes: int
|
2067 |
) -> KnowledgeGraph:
|
lightrag/lightrag.py
CHANGED
@@ -60,7 +60,7 @@ from .operate import (
|
|
60 |
query_with_keywords,
|
61 |
_rebuild_knowledge_from_chunks,
|
62 |
)
|
63 |
-
from .
|
64 |
from .utils import (
|
65 |
Tokenizer,
|
66 |
TiktokenTokenizer,
|
@@ -1761,68 +1761,54 @@ class LightRAG:
|
|
1761 |
# Use graph database lock to ensure atomic merges and updates
|
1762 |
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
1763 |
async with graph_db_lock:
|
1764 |
-
#
|
1765 |
-
|
1766 |
-
|
1767 |
-
|
1768 |
-
|
1769 |
-
|
|
|
|
|
|
|
1770 |
)
|
1771 |
-
|
1772 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1773 |
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
1774 |
remaining_sources = sources - chunk_ids
|
1775 |
|
1776 |
if not remaining_sources:
|
1777 |
entities_to_delete.add(node_label)
|
1778 |
-
logger.debug(
|
1779 |
-
f"Entity {node_label} marked for deletion - no remaining sources"
|
1780 |
-
)
|
1781 |
elif remaining_sources != sources:
|
1782 |
-
# Entity needs to be rebuilt from remaining chunks
|
1783 |
entities_to_rebuild[node_label] = remaining_sources
|
1784 |
-
logger.debug(
|
1785 |
-
f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks"
|
1786 |
-
)
|
1787 |
|
1788 |
# Process relationships
|
1789 |
-
|
1790 |
-
|
1791 |
-
|
1792 |
-
|
1793 |
-
|
1794 |
-
|
1795 |
-
|
1796 |
-
|
1797 |
-
|
1798 |
-
|
1799 |
-
|
1800 |
-
|
1801 |
-
|
1802 |
-
|
1803 |
-
|
1804 |
-
|
1805 |
-
)
|
1806 |
-
|
1807 |
-
|
1808 |
-
sources = set(
|
1809 |
-
edge_data["source_id"].split(GRAPH_FIELD_SEP)
|
1810 |
-
)
|
1811 |
-
remaining_sources = sources - chunk_ids
|
1812 |
-
|
1813 |
-
if not remaining_sources:
|
1814 |
-
relationships_to_delete.add((src, tgt))
|
1815 |
-
logger.debug(
|
1816 |
-
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
|
1817 |
-
)
|
1818 |
-
elif remaining_sources != sources:
|
1819 |
-
# Relationship needs to be rebuilt from remaining chunks
|
1820 |
-
relationships_to_rebuild[(src, tgt)] = (
|
1821 |
-
remaining_sources
|
1822 |
-
)
|
1823 |
-
logger.debug(
|
1824 |
-
f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks"
|
1825 |
-
)
|
1826 |
|
1827 |
# 5. Delete chunks from storage
|
1828 |
if chunk_ids:
|
|
|
60 |
query_with_keywords,
|
61 |
_rebuild_knowledge_from_chunks,
|
62 |
)
|
63 |
+
from .constants import GRAPH_FIELD_SEP
|
64 |
from .utils import (
|
65 |
Tokenizer,
|
66 |
TiktokenTokenizer,
|
|
|
1761 |
# Use graph database lock to ensure atomic merges and updates
|
1762 |
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
1763 |
async with graph_db_lock:
|
1764 |
+
# Get all affected nodes and edges in batch
|
1765 |
+
affected_nodes = (
|
1766 |
+
await self.chunk_entity_relation_graph.get_nodes_by_chunk_ids(
|
1767 |
+
list(chunk_ids)
|
1768 |
+
)
|
1769 |
+
)
|
1770 |
+
affected_edges = (
|
1771 |
+
await self.chunk_entity_relation_graph.get_edges_by_chunk_ids(
|
1772 |
+
list(chunk_ids)
|
1773 |
)
|
1774 |
+
)
|
1775 |
+
|
1776 |
+
# logger.info(f"chunk_ids: {chunk_ids}")
|
1777 |
+
# logger.info(f"affected_nodes: {affected_nodes}")
|
1778 |
+
# logger.info(f"affected_edges: {affected_edges}")
|
1779 |
+
|
1780 |
+
# Process entities
|
1781 |
+
for node_data in affected_nodes:
|
1782 |
+
node_label = node_data.get("entity_id")
|
1783 |
+
if node_label and "source_id" in node_data:
|
1784 |
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
1785 |
remaining_sources = sources - chunk_ids
|
1786 |
|
1787 |
if not remaining_sources:
|
1788 |
entities_to_delete.add(node_label)
|
|
|
|
|
|
|
1789 |
elif remaining_sources != sources:
|
|
|
1790 |
entities_to_rebuild[node_label] = remaining_sources
|
|
|
|
|
|
|
1791 |
|
1792 |
# Process relationships
|
1793 |
+
for edge_data in affected_edges:
|
1794 |
+
src = edge_data.get("source")
|
1795 |
+
tgt = edge_data.get("target")
|
1796 |
+
|
1797 |
+
if src and tgt and "source_id" in edge_data:
|
1798 |
+
edge_tuple = tuple(sorted((src, tgt)))
|
1799 |
+
if (
|
1800 |
+
edge_tuple in relationships_to_delete
|
1801 |
+
or edge_tuple in relationships_to_rebuild
|
1802 |
+
):
|
1803 |
+
continue
|
1804 |
+
|
1805 |
+
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
1806 |
+
remaining_sources = sources - chunk_ids
|
1807 |
+
|
1808 |
+
if not remaining_sources:
|
1809 |
+
relationships_to_delete.add(edge_tuple)
|
1810 |
+
elif remaining_sources != sources:
|
1811 |
+
relationships_to_rebuild[edge_tuple] = remaining_sources
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1812 |
|
1813 |
# 5. Delete chunks from storage
|
1814 |
if chunk_ids:
|
lightrag/operate.py
CHANGED
@@ -33,7 +33,8 @@ from .base import (
|
|
33 |
TextChunkSchema,
|
34 |
QueryParam,
|
35 |
)
|
36 |
-
from .prompt import
|
|
|
37 |
import time
|
38 |
from dotenv import load_dotenv
|
39 |
|
|
|
33 |
TextChunkSchema,
|
34 |
QueryParam,
|
35 |
)
|
36 |
+
from .prompt import PROMPTS
|
37 |
+
from .constants import GRAPH_FIELD_SEP
|
38 |
import time
|
39 |
from dotenv import load_dotenv
|
40 |
|
lightrag/prompt.py
CHANGED
@@ -1,7 +1,6 @@
|
|
1 |
from __future__ import annotations
|
2 |
from typing import Any
|
3 |
|
4 |
-
GRAPH_FIELD_SEP = "<SEP>"
|
5 |
|
6 |
PROMPTS: dict[str, Any] = {}
|
7 |
|
|
|
1 |
from __future__ import annotations
|
2 |
from typing import Any
|
3 |
|
|
|
4 |
|
5 |
PROMPTS: dict[str, Any] = {}
|
6 |
|