yangdx commited on
Commit
284f404
·
1 Parent(s): ecaddc9

Add graph_db_lock to esure consistency across multiple processes for node and edge edition jobs

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +555 -536
lightrag/lightrag.py CHANGED
@@ -11,12 +11,18 @@ from functools import partial
11
  from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
12
  import pandas as pd
13
 
 
14
 
15
  from lightrag.kg import (
16
  STORAGES,
17
  verify_storage_implementation,
18
  )
19
 
 
 
 
 
 
20
  from .base import (
21
  BaseGraphStorage,
22
  BaseKVStorage,
@@ -779,10 +785,6 @@ class LightRAG:
779
  3. Process each chunk for entity and relation extraction
780
  4. Update the document status
781
  """
782
- from lightrag.kg.shared_storage import (
783
- get_namespace_data,
784
- get_pipeline_status_lock,
785
- )
786
 
787
  # Get pipeline status shared data and lock
788
  pipeline_status = await get_namespace_data("pipeline_status")
@@ -1431,19 +1433,21 @@ class LightRAG:
1431
  loop = always_get_an_event_loop()
1432
  return loop.run_until_complete(self.adelete_by_entity(entity_name))
1433
 
1434
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
1435
  async def adelete_by_entity(self, entity_name: str) -> None:
1436
- try:
1437
- await self.entities_vdb.delete_entity(entity_name)
1438
- await self.relationships_vdb.delete_entity_relation(entity_name)
1439
- await self.chunk_entity_relation_graph.delete_node(entity_name)
 
 
 
1440
 
1441
- logger.info(
1442
- f"Entity '{entity_name}' and its relationships have been deleted."
1443
- )
1444
- await self._delete_by_entity_done()
1445
- except Exception as e:
1446
- logger.error(f"Error while deleting entity '{entity_name}': {e}")
1447
 
1448
  async def _delete_by_entity_done(self) -> None:
1449
  await asyncio.gather(
@@ -1469,7 +1473,6 @@ class LightRAG:
1469
  self.adelete_by_relation(source_entity, target_entity)
1470
  )
1471
 
1472
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
1473
  async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
1474
  """Asynchronously delete a relation between two entities.
1475
 
@@ -1477,37 +1480,40 @@ class LightRAG:
1477
  source_entity: Name of the source entity
1478
  target_entity: Name of the target entity
1479
  """
1480
- try:
1481
- # TODO: check if has_edge function works on reverse relation
1482
- # Check if the relation exists
1483
- edge_exists = await self.chunk_entity_relation_graph.has_edge(
1484
- source_entity, target_entity
1485
- )
1486
- if not edge_exists:
1487
- logger.warning(
1488
- f"Relation from '{source_entity}' to '{target_entity}' does not exist"
1489
  )
1490
- return
 
 
 
 
1491
 
1492
- # Delete relation from vector database
1493
- relation_id = compute_mdhash_id(
1494
- source_entity + target_entity, prefix="rel-"
1495
- )
1496
- await self.relationships_vdb.delete([relation_id])
1497
 
1498
- # Delete relation from knowledge graph
1499
- await self.chunk_entity_relation_graph.remove_edges(
1500
- [(source_entity, target_entity)]
1501
- )
1502
 
1503
- logger.info(
1504
- f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
1505
- )
1506
- await self._delete_relation_done()
1507
- except Exception as e:
1508
- logger.error(
1509
- f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
1510
- )
1511
 
1512
  async def _delete_relation_done(self) -> None:
1513
  """Callback after relation deletion is complete"""
@@ -1539,7 +1545,8 @@ class LightRAG:
1539
  """
1540
  return await self.doc_status.get_docs_by_status(status)
1541
 
1542
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
 
1543
  async def adelete_by_doc_id(self, doc_id: str) -> None:
1544
  """Delete a document and all its related data
1545
 
@@ -1898,7 +1905,6 @@ class LightRAG:
1898
  """Synchronous version of aclear_cache."""
1899
  return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
1900
 
1901
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
1902
  async def aedit_entity(
1903
  self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
1904
  ) -> dict[str, Any]:
@@ -1914,169 +1920,172 @@ class LightRAG:
1914
  Returns:
1915
  Dictionary containing updated entity information
1916
  """
1917
- try:
1918
- # 1. Get current entity information
1919
- node_exists = await self.chunk_entity_relation_graph.has_node(entity_name)
1920
- if not node_exists:
1921
- raise ValueError(f"Entity '{entity_name}' does not exist")
1922
- node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
1923
-
1924
- # Check if entity is being renamed
1925
- new_entity_name = updated_data.get("entity_name", entity_name)
1926
- is_renaming = new_entity_name != entity_name
1927
-
1928
- # If renaming, check if new name already exists
1929
- if is_renaming:
1930
- if not allow_rename:
1931
- raise ValueError(
1932
- "Entity renaming is not allowed. Set allow_rename=True to enable this feature"
1933
- )
1934
 
1935
- existing_node = await self.chunk_entity_relation_graph.has_node(
1936
- new_entity_name
1937
- )
1938
- if existing_node:
1939
- raise ValueError(
1940
- f"Entity name '{new_entity_name}' already exists, cannot rename"
 
 
 
 
 
 
 
1941
  )
 
 
 
 
1942
 
1943
- # 2. Update entity information in the graph
1944
- new_node_data = {**node_data, **updated_data}
1945
- new_node_data["entity_id"] = new_entity_name
1946
 
1947
- if "entity_name" in new_node_data:
1948
- del new_node_data[
1949
- "entity_name"
1950
- ] # Node data should not contain entity_name field
1951
 
1952
- # If renaming entity
1953
- if is_renaming:
1954
- logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'")
1955
 
1956
- # Create new entity
1957
- await self.chunk_entity_relation_graph.upsert_node(
1958
- new_entity_name, new_node_data
1959
- )
1960
 
1961
- # Store relationships that need to be updated
1962
- relations_to_update = []
1963
- relations_to_delete = []
1964
- # Get all edges related to the original entity
1965
- edges = await self.chunk_entity_relation_graph.get_node_edges(
1966
- entity_name
1967
- )
1968
- if edges:
1969
- # Recreate edges for the new entity
1970
- for source, target in edges:
1971
- edge_data = await self.chunk_entity_relation_graph.get_edge(
1972
- source, target
1973
- )
1974
- if edge_data:
1975
- relations_to_delete.append(
1976
- compute_mdhash_id(source + target, prefix="rel-")
1977
- )
1978
- relations_to_delete.append(
1979
- compute_mdhash_id(target + source, prefix="rel-")
1980
  )
1981
- if source == entity_name:
1982
- await self.chunk_entity_relation_graph.upsert_edge(
1983
- new_entity_name, target, edge_data
1984
- )
1985
- relations_to_update.append(
1986
- (new_entity_name, target, edge_data)
1987
  )
1988
- else: # target == entity_name
1989
- await self.chunk_entity_relation_graph.upsert_edge(
1990
- source, new_entity_name, edge_data
1991
  )
1992
- relations_to_update.append(
1993
- (source, new_entity_name, edge_data)
1994
- )
1995
-
1996
- # Delete old entity
1997
- await self.chunk_entity_relation_graph.delete_node(entity_name)
1998
-
1999
- # Delete old entity record from vector database
2000
- old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2001
- await self.entities_vdb.delete([old_entity_id])
2002
- logger.info(
2003
- f"Deleted old entity '{entity_name}' and its vector embedding from database"
2004
- )
2005
-
2006
- # Delete old relation records from vector database
2007
- await self.relationships_vdb.delete(relations_to_delete)
2008
- logger.info(
2009
- f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
2010
- )
2011
-
2012
- # Update relationship vector representations
2013
- for src, tgt, edge_data in relations_to_update:
2014
- description = edge_data.get("description", "")
2015
- keywords = edge_data.get("keywords", "")
2016
- source_id = edge_data.get("source_id", "")
2017
- weight = float(edge_data.get("weight", 1.0))
2018
-
2019
- # Create new content for embedding
2020
- content = f"{src}\t{tgt}\n{keywords}\n{description}"
2021
 
2022
- # Calculate relationship ID
2023
- relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
 
 
 
2024
 
2025
- # Prepare data for vector database update
2026
- relation_data = {
2027
- relation_id: {
2028
- "content": content,
2029
- "src_id": src,
2030
- "tgt_id": tgt,
2031
- "source_id": source_id,
2032
- "description": description,
2033
- "keywords": keywords,
2034
- "weight": weight,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2035
  }
2036
- }
2037
 
2038
- # Update vector database
2039
- await self.relationships_vdb.upsert(relation_data)
2040
 
2041
- # Update working entity name to new name
2042
- entity_name = new_entity_name
2043
- else:
2044
- # If not renaming, directly update node data
2045
- await self.chunk_entity_relation_graph.upsert_node(
2046
- entity_name, new_node_data
2047
- )
2048
 
2049
- # 3. Recalculate entity's vector representation and update vector database
2050
- description = new_node_data.get("description", "")
2051
- source_id = new_node_data.get("source_id", "")
2052
- entity_type = new_node_data.get("entity_type", "")
2053
- content = entity_name + "\n" + description
2054
 
2055
- # Calculate entity ID
2056
- entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2057
 
2058
- # Prepare data for vector database update
2059
- entity_data = {
2060
- entity_id: {
2061
- "content": content,
2062
- "entity_name": entity_name,
2063
- "source_id": source_id,
2064
- "description": description,
2065
- "entity_type": entity_type,
 
2066
  }
2067
- }
2068
 
2069
- # Update vector database
2070
- await self.entities_vdb.upsert(entity_data)
2071
 
2072
- # 4. Save changes
2073
- await self._edit_entity_done()
2074
 
2075
- logger.info(f"Entity '{entity_name}' successfully updated")
2076
- return await self.get_entity_info(entity_name, include_vector_data=True)
2077
- except Exception as e:
2078
- logger.error(f"Error while editing entity '{entity_name}': {e}")
2079
- raise
2080
 
2081
  def edit_entity(
2082
  self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
@@ -2111,7 +2120,6 @@ class LightRAG:
2111
  ]
2112
  )
2113
 
2114
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
2115
  async def aedit_relation(
2116
  self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
2117
  ) -> dict[str, Any]:
@@ -2127,77 +2135,80 @@ class LightRAG:
2127
  Returns:
2128
  Dictionary containing updated relation information
2129
  """
2130
- try:
2131
- # 1. Get current relation information
2132
- edge_exists = await self.chunk_entity_relation_graph.has_edge(
2133
- source_entity, target_entity
2134
- )
2135
- if not edge_exists:
2136
- raise ValueError(
2137
- f"Relation from '{source_entity}' to '{target_entity}' does not exist"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2138
  )
2139
- edge_data = await self.chunk_entity_relation_graph.get_edge(
2140
- source_entity, target_entity
2141
- )
2142
- # Important: First delete the old relation record from the vector database
2143
- old_relation_id = compute_mdhash_id(
2144
- source_entity + target_entity, prefix="rel-"
2145
- )
2146
- await self.relationships_vdb.delete([old_relation_id])
2147
- logger.info(
2148
- f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}"
2149
- )
2150
 
2151
- # 2. Update relation information in the graph
2152
- new_edge_data = {**edge_data, **updated_data}
2153
- await self.chunk_entity_relation_graph.upsert_edge(
2154
- source_entity, target_entity, new_edge_data
2155
- )
2156
 
2157
- # 3. Recalculate relation's vector representation and update vector database
2158
- description = new_edge_data.get("description", "")
2159
- keywords = new_edge_data.get("keywords", "")
2160
- source_id = new_edge_data.get("source_id", "")
2161
- weight = float(new_edge_data.get("weight", 1.0))
2162
 
2163
- # Create content for embedding
2164
- content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}"
2165
 
2166
- # Calculate relation ID
2167
- relation_id = compute_mdhash_id(
2168
- source_entity + target_entity, prefix="rel-"
2169
- )
2170
 
2171
- # Prepare data for vector database update
2172
- relation_data = {
2173
- relation_id: {
2174
- "content": content,
2175
- "src_id": source_entity,
2176
- "tgt_id": target_entity,
2177
- "source_id": source_id,
2178
- "description": description,
2179
- "keywords": keywords,
2180
- "weight": weight,
 
2181
  }
2182
- }
2183
 
2184
- # Update vector database
2185
- await self.relationships_vdb.upsert(relation_data)
2186
 
2187
- # 4. Save changes
2188
- await self._edit_relation_done()
2189
 
2190
- logger.info(
2191
- f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
2192
- )
2193
- return await self.get_relation_info(
2194
- source_entity, target_entity, include_vector_data=True
2195
- )
2196
- except Exception as e:
2197
- logger.error(
2198
- f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}"
2199
- )
2200
- raise
2201
 
2202
  def edit_relation(
2203
  self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
@@ -2245,54 +2256,57 @@ class LightRAG:
2245
  Returns:
2246
  Dictionary containing created entity information
2247
  """
2248
- try:
2249
- # Check if entity already exists
2250
- existing_node = await self.chunk_entity_relation_graph.has_node(entity_name)
2251
- if existing_node:
2252
- raise ValueError(f"Entity '{entity_name}' already exists")
2253
-
2254
- # Prepare node data with defaults if missing
2255
- node_data = {
2256
- "entity_id": entity_name,
2257
- "entity_type": entity_data.get("entity_type", "UNKNOWN"),
2258
- "description": entity_data.get("description", ""),
2259
- "source_id": entity_data.get("source_id", "manual"),
2260
- }
2261
 
2262
- # Add entity to knowledge graph
2263
- await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data)
 
 
 
 
 
2264
 
2265
- # Prepare content for entity
2266
- description = node_data.get("description", "")
2267
- source_id = node_data.get("source_id", "")
2268
- entity_type = node_data.get("entity_type", "")
2269
- content = entity_name + "\n" + description
2270
 
2271
- # Calculate entity ID
2272
- entity_id = compute_mdhash_id(entity_name, prefix="ent-")
 
 
 
2273
 
2274
- # Prepare data for vector database update
2275
- entity_data_for_vdb = {
2276
- entity_id: {
2277
- "content": content,
2278
- "entity_name": entity_name,
2279
- "source_id": source_id,
2280
- "description": description,
2281
- "entity_type": entity_type,
 
 
 
 
2282
  }
2283
- }
2284
 
2285
- # Update vector database
2286
- await self.entities_vdb.upsert(entity_data_for_vdb)
2287
 
2288
- # Save changes
2289
- await self._edit_entity_done()
2290
 
2291
- logger.info(f"Entity '{entity_name}' successfully created")
2292
- return await self.get_entity_info(entity_name, include_vector_data=True)
2293
- except Exception as e:
2294
- logger.error(f"Error while creating entity '{entity_name}': {e}")
2295
- raise
2296
 
2297
  def create_entity(
2298
  self, entity_name: str, entity_data: dict[str, Any]
@@ -2325,86 +2339,89 @@ class LightRAG:
2325
  Returns:
2326
  Dictionary containing created relation information
2327
  """
2328
- try:
2329
- # Check if both entities exist
2330
- source_exists = await self.chunk_entity_relation_graph.has_node(
2331
- source_entity
2332
- )
2333
- target_exists = await self.chunk_entity_relation_graph.has_node(
2334
- target_entity
2335
- )
 
 
 
2336
 
2337
- if not source_exists:
2338
- raise ValueError(f"Source entity '{source_entity}' does not exist")
2339
- if not target_exists:
2340
- raise ValueError(f"Target entity '{target_entity}' does not exist")
2341
 
2342
- # Check if relation already exists
2343
- existing_edge = await self.chunk_entity_relation_graph.has_edge(
2344
- source_entity, target_entity
2345
- )
2346
- if existing_edge:
2347
- raise ValueError(
2348
- f"Relation from '{source_entity}' to '{target_entity}' already exists"
2349
  )
 
 
 
 
2350
 
2351
- # Prepare edge data with defaults if missing
2352
- edge_data = {
2353
- "description": relation_data.get("description", ""),
2354
- "keywords": relation_data.get("keywords", ""),
2355
- "source_id": relation_data.get("source_id", "manual"),
2356
- "weight": float(relation_data.get("weight", 1.0)),
2357
- }
2358
 
2359
- # Add relation to knowledge graph
2360
- await self.chunk_entity_relation_graph.upsert_edge(
2361
- source_entity, target_entity, edge_data
2362
- )
2363
 
2364
- # Prepare content for embedding
2365
- description = edge_data.get("description", "")
2366
- keywords = edge_data.get("keywords", "")
2367
- source_id = edge_data.get("source_id", "")
2368
- weight = edge_data.get("weight", 1.0)
2369
 
2370
- # Create content for embedding
2371
- content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
2372
 
2373
- # Calculate relation ID
2374
- relation_id = compute_mdhash_id(
2375
- source_entity + target_entity, prefix="rel-"
2376
- )
2377
 
2378
- # Prepare data for vector database update
2379
- relation_data_for_vdb = {
2380
- relation_id: {
2381
- "content": content,
2382
- "src_id": source_entity,
2383
- "tgt_id": target_entity,
2384
- "source_id": source_id,
2385
- "description": description,
2386
- "keywords": keywords,
2387
- "weight": weight,
 
2388
  }
2389
- }
2390
 
2391
- # Update vector database
2392
- await self.relationships_vdb.upsert(relation_data_for_vdb)
2393
 
2394
- # Save changes
2395
- await self._edit_relation_done()
2396
 
2397
- logger.info(
2398
- f"Relation from '{source_entity}' to '{target_entity}' successfully created"
2399
- )
2400
- return await self.get_relation_info(
2401
- source_entity, target_entity, include_vector_data=True
2402
- )
2403
- except Exception as e:
2404
- logger.error(
2405
- f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}"
2406
- )
2407
- raise
2408
 
2409
  def create_relation(
2410
  self, source_entity: str, target_entity: str, relation_data: dict[str, Any]
@@ -2426,7 +2443,6 @@ class LightRAG:
2426
  self.acreate_relation(source_entity, target_entity, relation_data)
2427
  )
2428
 
2429
- # TODO: Lock all KG relative DB to esure consistency across multiple processes
2430
  async def amerge_entities(
2431
  self,
2432
  source_entities: list[str],
@@ -2454,221 +2470,224 @@ class LightRAG:
2454
  Returns:
2455
  Dictionary containing the merged entity information
2456
  """
2457
- try:
2458
- # Default merge strategy
2459
- default_strategy = {
2460
- "description": "concatenate",
2461
- "entity_type": "keep_first",
2462
- "source_id": "join_unique",
2463
- }
2464
-
2465
- merge_strategy = (
2466
- default_strategy
2467
- if merge_strategy is None
2468
- else {**default_strategy, **merge_strategy}
2469
- )
2470
- target_entity_data = (
2471
- {} if target_entity_data is None else target_entity_data
2472
- )
2473
 
2474
- # 1. Check if all source entities exist
2475
- source_entities_data = {}
2476
- for entity_name in source_entities:
2477
- node_exists = await self.chunk_entity_relation_graph.has_node(
2478
- entity_name
 
 
2479
  )
2480
- if not node_exists:
2481
- raise ValueError(f"Source entity '{entity_name}' does not exist")
2482
- node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
2483
- source_entities_data[entity_name] = node_data
2484
 
2485
- # 2. Check if target entity exists and get its data if it does
2486
- target_exists = await self.chunk_entity_relation_graph.has_node(
2487
- target_entity
2488
- )
2489
- existing_target_entity_data = {}
2490
- if target_exists:
2491
- existing_target_entity_data = (
2492
- await self.chunk_entity_relation_graph.get_node(target_entity)
 
 
 
 
 
 
2493
  )
2494
- logger.info(
2495
- f"Target entity '{target_entity}' already exists, will merge data"
 
 
 
 
 
 
 
 
 
 
 
 
2496
  )
2497
 
2498
- # 3. Merge entity data
2499
- merged_entity_data = self._merge_entity_attributes(
2500
- list(source_entities_data.values())
2501
- + ([existing_target_entity_data] if target_exists else []),
2502
- merge_strategy,
2503
- )
2504
 
2505
- # Apply any explicitly provided target entity data (overrides merged data)
2506
- for key, value in target_entity_data.items():
2507
- merged_entity_data[key] = value
 
 
 
 
 
 
 
 
 
 
 
 
2508
 
2509
- # 4. Get all relationships of the source entities
2510
- all_relations = []
2511
- for entity_name in source_entities:
2512
- # Get all relationships of the source entities
2513
- edges = await self.chunk_entity_relation_graph.get_node_edges(
2514
- entity_name
2515
- )
2516
- if edges:
2517
- for src, tgt in edges:
2518
- # Ensure src is the current entity
2519
- if src == entity_name:
2520
- edge_data = await self.chunk_entity_relation_graph.get_edge(
2521
- src, tgt
2522
- )
2523
- all_relations.append((src, tgt, edge_data))
2524
 
2525
- # 5. Create or update the target entity
2526
- merged_entity_data["entity_id"] = target_entity
2527
- if not target_exists:
2528
- await self.chunk_entity_relation_graph.upsert_node(
2529
- target_entity, merged_entity_data
2530
- )
2531
- logger.info(f"Created new target entity '{target_entity}'")
2532
- else:
2533
- await self.chunk_entity_relation_graph.upsert_node(
2534
- target_entity, merged_entity_data
2535
- )
2536
- logger.info(f"Updated existing target entity '{target_entity}'")
2537
 
2538
- # 6. Recreate all relationships, pointing to the target entity
2539
- relation_updates = {} # Track relationships that need to be merged
2540
- relations_to_delete = []
 
 
2541
 
2542
- for src, tgt, edge_data in all_relations:
2543
- relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
2544
- relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
2545
- new_src = target_entity if src in source_entities else src
2546
- new_tgt = target_entity if tgt in source_entities else tgt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2547
 
2548
- # Skip relationships between source entities to avoid self-loops
2549
- if new_src == new_tgt:
 
 
 
2550
  logger.info(
2551
- f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
2552
  )
2553
- continue
2554
 
2555
- # Check if the same relationship already exists
2556
- relation_key = f"{new_src}|{new_tgt}"
2557
- if relation_key in relation_updates:
2558
- # Merge relationship data
2559
- existing_data = relation_updates[relation_key]["data"]
2560
- merged_relation = self._merge_relation_attributes(
2561
- [existing_data, edge_data],
2562
- {
2563
- "description": "concatenate",
2564
- "keywords": "join_unique",
2565
- "source_id": "join_unique",
2566
- "weight": "max",
2567
- },
2568
- )
2569
- relation_updates[relation_key]["data"] = merged_relation
2570
  logger.info(
2571
- f"Merged duplicate relationship: {new_src} -> {new_tgt}"
2572
  )
2573
- else:
2574
- relation_updates[relation_key] = {
2575
- "src": new_src,
2576
- "tgt": new_tgt,
2577
- "data": edge_data.copy(),
2578
- }
2579
-
2580
- # Apply relationship updates
2581
- for rel_data in relation_updates.values():
2582
- await self.chunk_entity_relation_graph.upsert_edge(
2583
- rel_data["src"], rel_data["tgt"], rel_data["data"]
2584
- )
2585
- logger.info(
2586
- f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
2587
- )
2588
 
2589
- # Delete relationships records from vector database
2590
- await self.relationships_vdb.delete(relations_to_delete)
2591
- logger.info(
2592
- f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
2593
- )
2594
 
2595
- # 7. Update entity vector representation
2596
- description = merged_entity_data.get("description", "")
2597
- source_id = merged_entity_data.get("source_id", "")
2598
- entity_type = merged_entity_data.get("entity_type", "")
2599
- content = target_entity + "\n" + description
2600
-
2601
- entity_id = compute_mdhash_id(target_entity, prefix="ent-")
2602
- entity_data_for_vdb = {
2603
- entity_id: {
2604
- "content": content,
2605
- "entity_name": target_entity,
2606
- "source_id": source_id,
2607
- "description": description,
2608
- "entity_type": entity_type,
2609
  }
2610
- }
2611
 
2612
- await self.entities_vdb.upsert(entity_data_for_vdb)
2613
 
2614
- # 8. Update relationship vector representations
2615
- for rel_data in relation_updates.values():
2616
- src = rel_data["src"]
2617
- tgt = rel_data["tgt"]
2618
- edge_data = rel_data["data"]
2619
 
2620
- description = edge_data.get("description", "")
2621
- keywords = edge_data.get("keywords", "")
2622
- source_id = edge_data.get("source_id", "")
2623
- weight = float(edge_data.get("weight", 1.0))
2624
 
2625
- content = f"{keywords}\t{src}\n{tgt}\n{description}"
2626
- relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
2627
 
2628
- relation_data_for_vdb = {
2629
- relation_id: {
2630
- "content": content,
2631
- "src_id": src,
2632
- "tgt_id": tgt,
2633
- "source_id": source_id,
2634
- "description": description,
2635
- "keywords": keywords,
2636
- "weight": weight,
 
2637
  }
2638
- }
2639
 
2640
- await self.relationships_vdb.upsert(relation_data_for_vdb)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2641
 
2642
- # 9. Delete source entities
2643
- for entity_name in source_entities:
2644
- if entity_name == target_entity:
2645
  logger.info(
2646
- f"Skipping deletion of '{entity_name}' as it's also the target entity"
2647
  )
2648
- continue
2649
 
2650
- # Delete entity node from knowledge graph
2651
- await self.chunk_entity_relation_graph.delete_node(entity_name)
2652
-
2653
- # Delete entity record from vector database
2654
- entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2655
- await self.entities_vdb.delete([entity_id])
2656
 
2657
  logger.info(
2658
- f"Deleted source entity '{entity_name}' and its vector embedding from database"
2659
  )
 
2660
 
2661
- # 10. Save changes
2662
- await self._merge_entities_done()
2663
-
2664
- logger.info(
2665
- f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
2666
- )
2667
- return await self.get_entity_info(target_entity, include_vector_data=True)
2668
-
2669
- except Exception as e:
2670
- logger.error(f"Error merging entities: {e}")
2671
- raise
2672
 
2673
  async def aexport_data(
2674
  self,
 
11
  from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
12
  import pandas as pd
13
 
14
+ from .kg.shared_storage import get_graph_db_lock
15
 
16
  from lightrag.kg import (
17
  STORAGES,
18
  verify_storage_implementation,
19
  )
20
 
21
+ from lightrag.kg.shared_storage import (
22
+ get_namespace_data,
23
+ get_pipeline_status_lock,
24
+ )
25
+
26
  from .base import (
27
  BaseGraphStorage,
28
  BaseKVStorage,
 
785
  3. Process each chunk for entity and relation extraction
786
  4. Update the document status
787
  """
 
 
 
 
788
 
789
  # Get pipeline status shared data and lock
790
  pipeline_status = await get_namespace_data("pipeline_status")
 
1433
  loop = always_get_an_event_loop()
1434
  return loop.run_until_complete(self.adelete_by_entity(entity_name))
1435
 
 
1436
  async def adelete_by_entity(self, entity_name: str) -> None:
1437
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
1438
+ # Use graph database lock to ensure atomic graph and vector db operations
1439
+ async with graph_db_lock:
1440
+ try:
1441
+ await self.entities_vdb.delete_entity(entity_name)
1442
+ await self.relationships_vdb.delete_entity_relation(entity_name)
1443
+ await self.chunk_entity_relation_graph.delete_node(entity_name)
1444
 
1445
+ logger.info(
1446
+ f"Entity '{entity_name}' and its relationships have been deleted."
1447
+ )
1448
+ await self._delete_by_entity_done()
1449
+ except Exception as e:
1450
+ logger.error(f"Error while deleting entity '{entity_name}': {e}")
1451
 
1452
  async def _delete_by_entity_done(self) -> None:
1453
  await asyncio.gather(
 
1473
  self.adelete_by_relation(source_entity, target_entity)
1474
  )
1475
 
 
1476
  async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
1477
  """Asynchronously delete a relation between two entities.
1478
 
 
1480
  source_entity: Name of the source entity
1481
  target_entity: Name of the target entity
1482
  """
1483
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
1484
+ # Use graph database lock to ensure atomic graph and vector db operations
1485
+ async with graph_db_lock:
1486
+ try:
1487
+ # TODO: check if has_edge function works on reverse relation
1488
+ # Check if the relation exists
1489
+ edge_exists = await self.chunk_entity_relation_graph.has_edge(
1490
+ source_entity, target_entity
 
1491
  )
1492
+ if not edge_exists:
1493
+ logger.warning(
1494
+ f"Relation from '{source_entity}' to '{target_entity}' does not exist"
1495
+ )
1496
+ return
1497
 
1498
+ # Delete relation from vector database
1499
+ relation_id = compute_mdhash_id(
1500
+ source_entity + target_entity, prefix="rel-"
1501
+ )
1502
+ await self.relationships_vdb.delete([relation_id])
1503
 
1504
+ # Delete relation from knowledge graph
1505
+ await self.chunk_entity_relation_graph.remove_edges(
1506
+ [(source_entity, target_entity)]
1507
+ )
1508
 
1509
+ logger.info(
1510
+ f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
1511
+ )
1512
+ await self._delete_relation_done()
1513
+ except Exception as e:
1514
+ logger.error(
1515
+ f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
1516
+ )
1517
 
1518
  async def _delete_relation_done(self) -> None:
1519
  """Callback after relation deletion is complete"""
 
1545
  """
1546
  return await self.doc_status.get_docs_by_status(status)
1547
 
1548
+ # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
1549
+ # Document delete is not working properly for most of the storage implementations.
1550
  async def adelete_by_doc_id(self, doc_id: str) -> None:
1551
  """Delete a document and all its related data
1552
 
 
1905
  """Synchronous version of aclear_cache."""
1906
  return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
1907
 
 
1908
  async def aedit_entity(
1909
  self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
1910
  ) -> dict[str, Any]:
 
1920
  Returns:
1921
  Dictionary containing updated entity information
1922
  """
1923
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
1924
+ # Use graph database lock to ensure atomic graph and vector db operations
1925
+ async with graph_db_lock:
1926
+ try:
1927
+ # 1. Get current entity information
1928
+ node_exists = await self.chunk_entity_relation_graph.has_node(entity_name)
1929
+ if not node_exists:
1930
+ raise ValueError(f"Entity '{entity_name}' does not exist")
1931
+ node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
 
 
 
 
 
 
 
 
1932
 
1933
+ # Check if entity is being renamed
1934
+ new_entity_name = updated_data.get("entity_name", entity_name)
1935
+ is_renaming = new_entity_name != entity_name
1936
+
1937
+ # If renaming, check if new name already exists
1938
+ if is_renaming:
1939
+ if not allow_rename:
1940
+ raise ValueError(
1941
+ "Entity renaming is not allowed. Set allow_rename=True to enable this feature"
1942
+ )
1943
+
1944
+ existing_node = await self.chunk_entity_relation_graph.has_node(
1945
+ new_entity_name
1946
  )
1947
+ if existing_node:
1948
+ raise ValueError(
1949
+ f"Entity name '{new_entity_name}' already exists, cannot rename"
1950
+ )
1951
 
1952
+ # 2. Update entity information in the graph
1953
+ new_node_data = {**node_data, **updated_data}
1954
+ new_node_data["entity_id"] = new_entity_name
1955
 
1956
+ if "entity_name" in new_node_data:
1957
+ del new_node_data[
1958
+ "entity_name"
1959
+ ] # Node data should not contain entity_name field
1960
 
1961
+ # If renaming entity
1962
+ if is_renaming:
1963
+ logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'")
1964
 
1965
+ # Create new entity
1966
+ await self.chunk_entity_relation_graph.upsert_node(
1967
+ new_entity_name, new_node_data
1968
+ )
1969
 
1970
+ # Store relationships that need to be updated
1971
+ relations_to_update = []
1972
+ relations_to_delete = []
1973
+ # Get all edges related to the original entity
1974
+ edges = await self.chunk_entity_relation_graph.get_node_edges(
1975
+ entity_name
1976
+ )
1977
+ if edges:
1978
+ # Recreate edges for the new entity
1979
+ for source, target in edges:
1980
+ edge_data = await self.chunk_entity_relation_graph.get_edge(
1981
+ source, target
 
 
 
 
 
 
 
1982
  )
1983
+ if edge_data:
1984
+ relations_to_delete.append(
1985
+ compute_mdhash_id(source + target, prefix="rel-")
 
 
 
1986
  )
1987
+ relations_to_delete.append(
1988
+ compute_mdhash_id(target + source, prefix="rel-")
 
1989
  )
1990
+ if source == entity_name:
1991
+ await self.chunk_entity_relation_graph.upsert_edge(
1992
+ new_entity_name, target, edge_data
1993
+ )
1994
+ relations_to_update.append(
1995
+ (new_entity_name, target, edge_data)
1996
+ )
1997
+ else: # target == entity_name
1998
+ await self.chunk_entity_relation_graph.upsert_edge(
1999
+ source, new_entity_name, edge_data
2000
+ )
2001
+ relations_to_update.append(
2002
+ (source, new_entity_name, edge_data)
2003
+ )
2004
+
2005
+ # Delete old entity
2006
+ await self.chunk_entity_relation_graph.delete_node(entity_name)
2007
+
2008
+ # Delete old entity record from vector database
2009
+ old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2010
+ await self.entities_vdb.delete([old_entity_id])
2011
+ logger.info(
2012
+ f"Deleted old entity '{entity_name}' and its vector embedding from database"
2013
+ )
 
 
 
 
 
2014
 
2015
+ # Delete old relation records from vector database
2016
+ await self.relationships_vdb.delete(relations_to_delete)
2017
+ logger.info(
2018
+ f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
2019
+ )
2020
 
2021
+ # Update relationship vector representations
2022
+ for src, tgt, edge_data in relations_to_update:
2023
+ description = edge_data.get("description", "")
2024
+ keywords = edge_data.get("keywords", "")
2025
+ source_id = edge_data.get("source_id", "")
2026
+ weight = float(edge_data.get("weight", 1.0))
2027
+
2028
+ # Create new content for embedding
2029
+ content = f"{src}\t{tgt}\n{keywords}\n{description}"
2030
+
2031
+ # Calculate relationship ID
2032
+ relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
2033
+
2034
+ # Prepare data for vector database update
2035
+ relation_data = {
2036
+ relation_id: {
2037
+ "content": content,
2038
+ "src_id": src,
2039
+ "tgt_id": tgt,
2040
+ "source_id": source_id,
2041
+ "description": description,
2042
+ "keywords": keywords,
2043
+ "weight": weight,
2044
+ }
2045
  }
 
2046
 
2047
+ # Update vector database
2048
+ await self.relationships_vdb.upsert(relation_data)
2049
 
2050
+ # Update working entity name to new name
2051
+ entity_name = new_entity_name
2052
+ else:
2053
+ # If not renaming, directly update node data
2054
+ await self.chunk_entity_relation_graph.upsert_node(
2055
+ entity_name, new_node_data
2056
+ )
2057
 
2058
+ # 3. Recalculate entity's vector representation and update vector database
2059
+ description = new_node_data.get("description", "")
2060
+ source_id = new_node_data.get("source_id", "")
2061
+ entity_type = new_node_data.get("entity_type", "")
2062
+ content = entity_name + "\n" + description
2063
 
2064
+ # Calculate entity ID
2065
+ entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2066
 
2067
+ # Prepare data for vector database update
2068
+ entity_data = {
2069
+ entity_id: {
2070
+ "content": content,
2071
+ "entity_name": entity_name,
2072
+ "source_id": source_id,
2073
+ "description": description,
2074
+ "entity_type": entity_type,
2075
+ }
2076
  }
 
2077
 
2078
+ # Update vector database
2079
+ await self.entities_vdb.upsert(entity_data)
2080
 
2081
+ # 4. Save changes
2082
+ await self._edit_entity_done()
2083
 
2084
+ logger.info(f"Entity '{entity_name}' successfully updated")
2085
+ return await self.get_entity_info(entity_name, include_vector_data=True)
2086
+ except Exception as e:
2087
+ logger.error(f"Error while editing entity '{entity_name}': {e}")
2088
+ raise
2089
 
2090
  def edit_entity(
2091
  self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
 
2120
  ]
2121
  )
2122
 
 
2123
  async def aedit_relation(
2124
  self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
2125
  ) -> dict[str, Any]:
 
2135
  Returns:
2136
  Dictionary containing updated relation information
2137
  """
2138
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
2139
+ # Use graph database lock to ensure atomic graph and vector db operations
2140
+ async with graph_db_lock:
2141
+ try:
2142
+ # 1. Get current relation information
2143
+ edge_exists = await self.chunk_entity_relation_graph.has_edge(
2144
+ source_entity, target_entity
2145
+ )
2146
+ if not edge_exists:
2147
+ raise ValueError(
2148
+ f"Relation from '{source_entity}' to '{target_entity}' does not exist"
2149
+ )
2150
+ edge_data = await self.chunk_entity_relation_graph.get_edge(
2151
+ source_entity, target_entity
2152
+ )
2153
+ # Important: First delete the old relation record from the vector database
2154
+ old_relation_id = compute_mdhash_id(
2155
+ source_entity + target_entity, prefix="rel-"
2156
+ )
2157
+ await self.relationships_vdb.delete([old_relation_id])
2158
+ logger.info(
2159
+ f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}"
2160
  )
 
 
 
 
 
 
 
 
 
 
 
2161
 
2162
+ # 2. Update relation information in the graph
2163
+ new_edge_data = {**edge_data, **updated_data}
2164
+ await self.chunk_entity_relation_graph.upsert_edge(
2165
+ source_entity, target_entity, new_edge_data
2166
+ )
2167
 
2168
+ # 3. Recalculate relation's vector representation and update vector database
2169
+ description = new_edge_data.get("description", "")
2170
+ keywords = new_edge_data.get("keywords", "")
2171
+ source_id = new_edge_data.get("source_id", "")
2172
+ weight = float(new_edge_data.get("weight", 1.0))
2173
 
2174
+ # Create content for embedding
2175
+ content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}"
2176
 
2177
+ # Calculate relation ID
2178
+ relation_id = compute_mdhash_id(
2179
+ source_entity + target_entity, prefix="rel-"
2180
+ )
2181
 
2182
+ # Prepare data for vector database update
2183
+ relation_data = {
2184
+ relation_id: {
2185
+ "content": content,
2186
+ "src_id": source_entity,
2187
+ "tgt_id": target_entity,
2188
+ "source_id": source_id,
2189
+ "description": description,
2190
+ "keywords": keywords,
2191
+ "weight": weight,
2192
+ }
2193
  }
 
2194
 
2195
+ # Update vector database
2196
+ await self.relationships_vdb.upsert(relation_data)
2197
 
2198
+ # 4. Save changes
2199
+ await self._edit_relation_done()
2200
 
2201
+ logger.info(
2202
+ f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
2203
+ )
2204
+ return await self.get_relation_info(
2205
+ source_entity, target_entity, include_vector_data=True
2206
+ )
2207
+ except Exception as e:
2208
+ logger.error(
2209
+ f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}"
2210
+ )
2211
+ raise
2212
 
2213
  def edit_relation(
2214
  self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
 
2256
  Returns:
2257
  Dictionary containing created entity information
2258
  """
2259
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
2260
+ # Use graph database lock to ensure atomic graph and vector db operations
2261
+ async with graph_db_lock:
2262
+ try:
2263
+ # Check if entity already exists
2264
+ existing_node = await self.chunk_entity_relation_graph.has_node(entity_name)
2265
+ if existing_node:
2266
+ raise ValueError(f"Entity '{entity_name}' already exists")
 
 
 
 
 
2267
 
2268
+ # Prepare node data with defaults if missing
2269
+ node_data = {
2270
+ "entity_id": entity_name,
2271
+ "entity_type": entity_data.get("entity_type", "UNKNOWN"),
2272
+ "description": entity_data.get("description", ""),
2273
+ "source_id": entity_data.get("source_id", "manual"),
2274
+ }
2275
 
2276
+ # Add entity to knowledge graph
2277
+ await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data)
 
 
 
2278
 
2279
+ # Prepare content for entity
2280
+ description = node_data.get("description", "")
2281
+ source_id = node_data.get("source_id", "")
2282
+ entity_type = node_data.get("entity_type", "")
2283
+ content = entity_name + "\n" + description
2284
 
2285
+ # Calculate entity ID
2286
+ entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2287
+
2288
+ # Prepare data for vector database update
2289
+ entity_data_for_vdb = {
2290
+ entity_id: {
2291
+ "content": content,
2292
+ "entity_name": entity_name,
2293
+ "source_id": source_id,
2294
+ "description": description,
2295
+ "entity_type": entity_type,
2296
+ }
2297
  }
 
2298
 
2299
+ # Update vector database
2300
+ await self.entities_vdb.upsert(entity_data_for_vdb)
2301
 
2302
+ # Save changes
2303
+ await self._edit_entity_done()
2304
 
2305
+ logger.info(f"Entity '{entity_name}' successfully created")
2306
+ return await self.get_entity_info(entity_name, include_vector_data=True)
2307
+ except Exception as e:
2308
+ logger.error(f"Error while creating entity '{entity_name}': {e}")
2309
+ raise
2310
 
2311
  def create_entity(
2312
  self, entity_name: str, entity_data: dict[str, Any]
 
2339
  Returns:
2340
  Dictionary containing created relation information
2341
  """
2342
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
2343
+ # Use graph database lock to ensure atomic graph and vector db operations
2344
+ async with graph_db_lock:
2345
+ try:
2346
+ # Check if both entities exist
2347
+ source_exists = await self.chunk_entity_relation_graph.has_node(
2348
+ source_entity
2349
+ )
2350
+ target_exists = await self.chunk_entity_relation_graph.has_node(
2351
+ target_entity
2352
+ )
2353
 
2354
+ if not source_exists:
2355
+ raise ValueError(f"Source entity '{source_entity}' does not exist")
2356
+ if not target_exists:
2357
+ raise ValueError(f"Target entity '{target_entity}' does not exist")
2358
 
2359
+ # Check if relation already exists
2360
+ existing_edge = await self.chunk_entity_relation_graph.has_edge(
2361
+ source_entity, target_entity
 
 
 
 
2362
  )
2363
+ if existing_edge:
2364
+ raise ValueError(
2365
+ f"Relation from '{source_entity}' to '{target_entity}' already exists"
2366
+ )
2367
 
2368
+ # Prepare edge data with defaults if missing
2369
+ edge_data = {
2370
+ "description": relation_data.get("description", ""),
2371
+ "keywords": relation_data.get("keywords", ""),
2372
+ "source_id": relation_data.get("source_id", "manual"),
2373
+ "weight": float(relation_data.get("weight", 1.0)),
2374
+ }
2375
 
2376
+ # Add relation to knowledge graph
2377
+ await self.chunk_entity_relation_graph.upsert_edge(
2378
+ source_entity, target_entity, edge_data
2379
+ )
2380
 
2381
+ # Prepare content for embedding
2382
+ description = edge_data.get("description", "")
2383
+ keywords = edge_data.get("keywords", "")
2384
+ source_id = edge_data.get("source_id", "")
2385
+ weight = edge_data.get("weight", 1.0)
2386
 
2387
+ # Create content for embedding
2388
+ content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
2389
 
2390
+ # Calculate relation ID
2391
+ relation_id = compute_mdhash_id(
2392
+ source_entity + target_entity, prefix="rel-"
2393
+ )
2394
 
2395
+ # Prepare data for vector database update
2396
+ relation_data_for_vdb = {
2397
+ relation_id: {
2398
+ "content": content,
2399
+ "src_id": source_entity,
2400
+ "tgt_id": target_entity,
2401
+ "source_id": source_id,
2402
+ "description": description,
2403
+ "keywords": keywords,
2404
+ "weight": weight,
2405
+ }
2406
  }
 
2407
 
2408
+ # Update vector database
2409
+ await self.relationships_vdb.upsert(relation_data_for_vdb)
2410
 
2411
+ # Save changes
2412
+ await self._edit_relation_done()
2413
 
2414
+ logger.info(
2415
+ f"Relation from '{source_entity}' to '{target_entity}' successfully created"
2416
+ )
2417
+ return await self.get_relation_info(
2418
+ source_entity, target_entity, include_vector_data=True
2419
+ )
2420
+ except Exception as e:
2421
+ logger.error(
2422
+ f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}"
2423
+ )
2424
+ raise
2425
 
2426
  def create_relation(
2427
  self, source_entity: str, target_entity: str, relation_data: dict[str, Any]
 
2443
  self.acreate_relation(source_entity, target_entity, relation_data)
2444
  )
2445
 
 
2446
  async def amerge_entities(
2447
  self,
2448
  source_entities: list[str],
 
2470
  Returns:
2471
  Dictionary containing the merged entity information
2472
  """
2473
+ graph_db_lock = get_graph_db_lock(enable_logging=False)
2474
+ # Use graph database lock to ensure atomic graph and vector db operations
2475
+ async with graph_db_lock:
2476
+ try:
2477
+ # Default merge strategy
2478
+ default_strategy = {
2479
+ "description": "concatenate",
2480
+ "entity_type": "keep_first",
2481
+ "source_id": "join_unique",
2482
+ }
 
 
 
 
 
 
2483
 
2484
+ merge_strategy = (
2485
+ default_strategy
2486
+ if merge_strategy is None
2487
+ else {**default_strategy, **merge_strategy}
2488
+ )
2489
+ target_entity_data = (
2490
+ {} if target_entity_data is None else target_entity_data
2491
  )
 
 
 
 
2492
 
2493
+ # 1. Check if all source entities exist
2494
+ source_entities_data = {}
2495
+ for entity_name in source_entities:
2496
+ node_exists = await self.chunk_entity_relation_graph.has_node(
2497
+ entity_name
2498
+ )
2499
+ if not node_exists:
2500
+ raise ValueError(f"Source entity '{entity_name}' does not exist")
2501
+ node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
2502
+ source_entities_data[entity_name] = node_data
2503
+
2504
+ # 2. Check if target entity exists and get its data if it does
2505
+ target_exists = await self.chunk_entity_relation_graph.has_node(
2506
+ target_entity
2507
  )
2508
+ existing_target_entity_data = {}
2509
+ if target_exists:
2510
+ existing_target_entity_data = (
2511
+ await self.chunk_entity_relation_graph.get_node(target_entity)
2512
+ )
2513
+ logger.info(
2514
+ f"Target entity '{target_entity}' already exists, will merge data"
2515
+ )
2516
+
2517
+ # 3. Merge entity data
2518
+ merged_entity_data = self._merge_entity_attributes(
2519
+ list(source_entities_data.values())
2520
+ + ([existing_target_entity_data] if target_exists else []),
2521
+ merge_strategy,
2522
  )
2523
 
2524
+ # Apply any explicitly provided target entity data (overrides merged data)
2525
+ for key, value in target_entity_data.items():
2526
+ merged_entity_data[key] = value
 
 
 
2527
 
2528
+ # 4. Get all relationships of the source entities
2529
+ all_relations = []
2530
+ for entity_name in source_entities:
2531
+ # Get all relationships of the source entities
2532
+ edges = await self.chunk_entity_relation_graph.get_node_edges(
2533
+ entity_name
2534
+ )
2535
+ if edges:
2536
+ for src, tgt in edges:
2537
+ # Ensure src is the current entity
2538
+ if src == entity_name:
2539
+ edge_data = await self.chunk_entity_relation_graph.get_edge(
2540
+ src, tgt
2541
+ )
2542
+ all_relations.append((src, tgt, edge_data))
2543
 
2544
+ # 5. Create or update the target entity
2545
+ merged_entity_data["entity_id"] = target_entity
2546
+ if not target_exists:
2547
+ await self.chunk_entity_relation_graph.upsert_node(
2548
+ target_entity, merged_entity_data
2549
+ )
2550
+ logger.info(f"Created new target entity '{target_entity}'")
2551
+ else:
2552
+ await self.chunk_entity_relation_graph.upsert_node(
2553
+ target_entity, merged_entity_data
2554
+ )
2555
+ logger.info(f"Updated existing target entity '{target_entity}'")
 
 
 
2556
 
2557
+ # 6. Recreate all relationships, pointing to the target entity
2558
+ relation_updates = {} # Track relationships that need to be merged
2559
+ relations_to_delete = []
 
 
 
 
 
 
 
 
 
2560
 
2561
+ for src, tgt, edge_data in all_relations:
2562
+ relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-"))
2563
+ relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-"))
2564
+ new_src = target_entity if src in source_entities else src
2565
+ new_tgt = target_entity if tgt in source_entities else tgt
2566
 
2567
+ # Skip relationships between source entities to avoid self-loops
2568
+ if new_src == new_tgt:
2569
+ logger.info(
2570
+ f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
2571
+ )
2572
+ continue
2573
+
2574
+ # Check if the same relationship already exists
2575
+ relation_key = f"{new_src}|{new_tgt}"
2576
+ if relation_key in relation_updates:
2577
+ # Merge relationship data
2578
+ existing_data = relation_updates[relation_key]["data"]
2579
+ merged_relation = self._merge_relation_attributes(
2580
+ [existing_data, edge_data],
2581
+ {
2582
+ "description": "concatenate",
2583
+ "keywords": "join_unique",
2584
+ "source_id": "join_unique",
2585
+ "weight": "max",
2586
+ },
2587
+ )
2588
+ relation_updates[relation_key]["data"] = merged_relation
2589
+ logger.info(
2590
+ f"Merged duplicate relationship: {new_src} -> {new_tgt}"
2591
+ )
2592
+ else:
2593
+ relation_updates[relation_key] = {
2594
+ "src": new_src,
2595
+ "tgt": new_tgt,
2596
+ "data": edge_data.copy(),
2597
+ }
2598
 
2599
+ # Apply relationship updates
2600
+ for rel_data in relation_updates.values():
2601
+ await self.chunk_entity_relation_graph.upsert_edge(
2602
+ rel_data["src"], rel_data["tgt"], rel_data["data"]
2603
+ )
2604
  logger.info(
2605
+ f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
2606
  )
 
2607
 
2608
+ # Delete relationships records from vector database
2609
+ await self.relationships_vdb.delete(relations_to_delete)
 
 
 
 
 
 
 
 
 
 
 
 
 
2610
  logger.info(
2611
+ f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database"
2612
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2613
 
2614
+ # 7. Update entity vector representation
2615
+ description = merged_entity_data.get("description", "")
2616
+ source_id = merged_entity_data.get("source_id", "")
2617
+ entity_type = merged_entity_data.get("entity_type", "")
2618
+ content = target_entity + "\n" + description
2619
 
2620
+ entity_id = compute_mdhash_id(target_entity, prefix="ent-")
2621
+ entity_data_for_vdb = {
2622
+ entity_id: {
2623
+ "content": content,
2624
+ "entity_name": target_entity,
2625
+ "source_id": source_id,
2626
+ "description": description,
2627
+ "entity_type": entity_type,
2628
+ }
 
 
 
 
 
2629
  }
 
2630
 
2631
+ await self.entities_vdb.upsert(entity_data_for_vdb)
2632
 
2633
+ # 8. Update relationship vector representations
2634
+ for rel_data in relation_updates.values():
2635
+ src = rel_data["src"]
2636
+ tgt = rel_data["tgt"]
2637
+ edge_data = rel_data["data"]
2638
 
2639
+ description = edge_data.get("description", "")
2640
+ keywords = edge_data.get("keywords", "")
2641
+ source_id = edge_data.get("source_id", "")
2642
+ weight = float(edge_data.get("weight", 1.0))
2643
 
2644
+ content = f"{keywords}\t{src}\n{tgt}\n{description}"
2645
+ relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
2646
 
2647
+ relation_data_for_vdb = {
2648
+ relation_id: {
2649
+ "content": content,
2650
+ "src_id": src,
2651
+ "tgt_id": tgt,
2652
+ "source_id": source_id,
2653
+ "description": description,
2654
+ "keywords": keywords,
2655
+ "weight": weight,
2656
+ }
2657
  }
 
2658
 
2659
+ await self.relationships_vdb.upsert(relation_data_for_vdb)
2660
+
2661
+ # 9. Delete source entities
2662
+ for entity_name in source_entities:
2663
+ if entity_name == target_entity:
2664
+ logger.info(
2665
+ f"Skipping deletion of '{entity_name}' as it's also the target entity"
2666
+ )
2667
+ continue
2668
+
2669
+ # Delete entity node from knowledge graph
2670
+ await self.chunk_entity_relation_graph.delete_node(entity_name)
2671
+
2672
+ # Delete entity record from vector database
2673
+ entity_id = compute_mdhash_id(entity_name, prefix="ent-")
2674
+ await self.entities_vdb.delete([entity_id])
2675
 
 
 
 
2676
  logger.info(
2677
+ f"Deleted source entity '{entity_name}' and its vector embedding from database"
2678
  )
 
2679
 
2680
+ # 10. Save changes
2681
+ await self._merge_entities_done()
 
 
 
 
2682
 
2683
  logger.info(
2684
+ f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
2685
  )
2686
+ return await self.get_entity_info(target_entity, include_vector_data=True)
2687
 
2688
+ except Exception as e:
2689
+ logger.error(f"Error merging entities: {e}")
2690
+ raise
 
 
 
 
 
 
 
 
2691
 
2692
  async def aexport_data(
2693
  self,