yangdx commited on
Commit
803d479
·
1 Parent(s): 49a0819

Add async lock for atomic graph database operations

Browse files

• Introduced graph_db_lock mechanism
• Ensured atomic node/edge merge and insert operation

Files changed (1) hide show
  1. lightrag/operate.py +17 -12
lightrag/operate.py CHANGED
@@ -519,19 +519,24 @@ async def extract_entities(
519
  for k, v in m_edges.items():
520
  maybe_edges[tuple(sorted(k))].extend(v)
521
 
522
- all_entities_data = await asyncio.gather(
523
- *[
524
- _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
525
- for k, v in maybe_nodes.items()
526
- ]
527
- )
 
 
 
 
 
528
 
529
- all_relationships_data = await asyncio.gather(
530
- *[
531
- _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config)
532
- for k, v in maybe_edges.items()
533
- ]
534
- )
535
 
536
  if not (all_entities_data or all_relationships_data):
537
  log_message = "Didn't extract any entities and relationships."
 
519
  for k, v in m_edges.items():
520
  maybe_edges[tuple(sorted(k))].extend(v)
521
 
522
+ from .kg.shared_storage import get_graph_db_lock
523
+ graph_db_lock = get_graph_db_lock(enable_logging = True)
524
+
525
+ # Ensure that nodes and edges are merged and upserted atomically
526
+ async with graph_db_lock:
527
+ all_entities_data = await asyncio.gather(
528
+ *[
529
+ _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
530
+ for k, v in maybe_nodes.items()
531
+ ]
532
+ )
533
 
534
+ all_relationships_data = await asyncio.gather(
535
+ *[
536
+ _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config)
537
+ for k, v in maybe_edges.items()
538
+ ]
539
+ )
540
 
541
  if not (all_entities_data or all_relationships_data):
542
  log_message = "Didn't extract any entities and relationships."