yangdx commited on
Commit
22d3f51
·
1 Parent(s): a313a22

Add duplicate edge upsert checking and logging

Browse files
Files changed (1) hide show
  1. lightrag/kg/neo4j_impl.py +44 -34
lightrag/kg/neo4j_impl.py CHANGED
@@ -412,9 +412,7 @@ class Neo4JStorage(BaseGraphStorage):
412
 
413
  result = await session.run(query)
414
  try:
415
- records = await result.fetch(
416
- 2
417
- ) # Get up to 2 records to check for duplicates
418
 
419
  if len(records) > 1:
420
  logger.warning(
@@ -552,20 +550,20 @@ class Neo4JStorage(BaseGraphStorage):
552
  label = self._ensure_label(node_id)
553
  properties = node_data
554
 
555
- async def _do_upsert(tx: AsyncManagedTransaction):
556
- query = f"""
557
- MERGE (n:`{label}`)
558
- SET n += $properties
559
- """
560
- result = await tx.run(query, properties=properties)
561
- logger.debug(
562
- f"Upserted node with label '{label}' and properties: {properties}"
563
- )
564
- await result.consume() # Ensure result is fully consumed
565
-
566
  try:
567
  async with self._driver.session(database=self._DATABASE) as session:
568
- await session.execute_write(_do_upsert)
 
 
 
 
 
 
 
 
 
 
 
569
  except Exception as e:
570
  logger.error(f"Error during upsert: {str(e)}")
571
  raise
@@ -614,27 +612,39 @@ class Neo4JStorage(BaseGraphStorage):
614
  f"Neo4j: target node with label '{target_label}' does not exist"
615
  )
616
 
617
- async def _do_upsert_edge(tx: AsyncManagedTransaction):
618
- query = f"""
619
- MATCH (source:`{source_label}`)
620
- WITH source
621
- MATCH (target:`{target_label}`)
622
- MERGE (source)-[r:DIRECTED]-(target)
623
- SET r += $properties
624
- RETURN r
625
- """
626
- result = await tx.run(query, properties=edge_properties)
627
- try:
628
- record = await result.single()
629
- logger.debug(
630
- f"Upserted edge from '{source_label}' to '{target_label}' with properties: {edge_properties}, result: {record['r'] if record else None}"
631
- )
632
- finally:
633
- await result.consume() # Ensure result is consumed
634
-
635
  try:
636
  async with self._driver.session(database=self._DATABASE) as session:
637
- await session.execute_write(_do_upsert_edge)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
638
  except Exception as e:
639
  logger.error(f"Error during edge upsert: {str(e)}")
640
  raise
 
412
 
413
  result = await session.run(query)
414
  try:
415
+ records = await result.fetch(2)
 
 
416
 
417
  if len(records) > 1:
418
  logger.warning(
 
550
  label = self._ensure_label(node_id)
551
  properties = node_data
552
 
 
 
 
 
 
 
 
 
 
 
 
553
  try:
554
  async with self._driver.session(database=self._DATABASE) as session:
555
+ async def execute_upsert(tx: AsyncManagedTransaction):
556
+ query = f"""
557
+ MERGE (n:`{label}`)
558
+ SET n += $properties
559
+ """
560
+ result = await tx.run(query, properties=properties)
561
+ logger.debug(
562
+ f"Upserted node with label '{label}' and properties: {properties}"
563
+ )
564
+ await result.consume() # Ensure result is fully consumed
565
+
566
+ await session.execute_write(execute_upsert)
567
  except Exception as e:
568
  logger.error(f"Error during upsert: {str(e)}")
569
  raise
 
612
  f"Neo4j: target node with label '{target_label}' does not exist"
613
  )
614
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
615
  try:
616
  async with self._driver.session(database=self._DATABASE) as session:
617
+ async def execute_upsert(tx: AsyncManagedTransaction):
618
+ query = f"""
619
+ MATCH (source:`{source_label}`)
620
+ WITH source
621
+ MATCH (target:`{target_label}`)
622
+ MERGE (source)-[r:DIRECTED]-(target)
623
+ SET r += $properties
624
+ RETURN r, source, target
625
+ """
626
+ result = await tx.run(query, properties=edge_properties)
627
+ try:
628
+ records = await result.fetch(100)
629
+ if len(records) > 1:
630
+ source_nodes = [dict(r['source']) for r in records]
631
+ target_nodes = [dict(r['target']) for r in records]
632
+ logger.warning(
633
+ f"Multiple edges created: found {len(records)} results for edge between "
634
+ f"source label '{source_label}' and target label '{target_label}'. "
635
+ f"Source nodes: {source_nodes}, "
636
+ f"Target nodes: {target_nodes}. "
637
+ "Using first edge only."
638
+ )
639
+ if records:
640
+ logger.debug(
641
+ f"Upserted edge from '{source_label}' to '{target_label}' "
642
+ f"with properties: {edge_properties}"
643
+ )
644
+ finally:
645
+ await result.consume() # Ensure result is consumed
646
+
647
+ await session.execute_write(execute_upsert)
648
  except Exception as e:
649
  logger.error(f"Error during edge upsert: {str(e)}")
650
  raise