gzdaniel commited on
Commit
f439a2a
·
1 Parent(s): e028351

Fix JSON handling error for PostgreSQL graph storage

Browse files
Files changed (1) hide show
  1. lightrag/kg/postgres_impl.py +57 -23
lightrag/kg/postgres_impl.py CHANGED
@@ -1372,6 +1372,14 @@ class PGGraphStorage(BaseGraphStorage):
1372
  if record:
1373
  node = record[0]
1374
  node_dict = node["n"]["properties"]
 
 
 
 
 
 
 
 
1375
 
1376
  return node_dict
1377
  return None
@@ -1420,7 +1428,15 @@ class PGGraphStorage(BaseGraphStorage):
1420
  record = await self._query(query)
1421
  if record and record[0] and record[0]["edge_properties"]:
1422
  result = record[0]["edge_properties"]
1423
-
 
 
 
 
 
 
 
 
1424
  return result
1425
 
1426
  async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@@ -1432,9 +1448,9 @@ class PGGraphStorage(BaseGraphStorage):
1432
 
1433
  query = """SELECT * FROM cypher('%s', $$
1434
  MATCH (n:base {entity_id: "%s"})
1435
- OPTIONAL MATCH (n)-[]-(connected)
1436
- RETURN n, connected
1437
- $$) AS (n agtype, connected agtype)""" % (
1438
  self.graph_name,
1439
  label,
1440
  )
@@ -1442,20 +1458,11 @@ class PGGraphStorage(BaseGraphStorage):
1442
  results = await self._query(query)
1443
  edges = []
1444
  for record in results:
1445
- source_node = record["n"] if record["n"] else None
1446
- connected_node = record["connected"] if record["connected"] else None
1447
 
1448
- if (
1449
- source_node
1450
- and connected_node
1451
- and "properties" in source_node
1452
- and "properties" in connected_node
1453
- ):
1454
- source_label = source_node["properties"].get("entity_id")
1455
- target_label = connected_node["properties"].get("entity_id")
1456
-
1457
- if source_label and target_label:
1458
- edges.append((source_label, target_label))
1459
 
1460
  return edges
1461
 
@@ -1638,6 +1645,15 @@ class PGGraphStorage(BaseGraphStorage):
1638
  for result in results:
1639
  if result["node_id"] and result["n"]:
1640
  node_dict = result["n"]["properties"]
 
 
 
 
 
 
 
 
 
1641
  # Remove the 'base' label if present in a 'labels' property
1642
  if "labels" in node_dict:
1643
  node_dict["labels"] = [
@@ -1789,15 +1805,33 @@ class PGGraphStorage(BaseGraphStorage):
1789
 
1790
  for result in forward_results:
1791
  if result["source"] and result["target"] and result["edge_properties"]:
1792
- edges_dict[(result["source"], result["target"])] = result[
1793
- "edge_properties"
1794
- ]
 
 
 
 
 
 
 
 
 
1795
 
1796
  for result in backward_results:
1797
  if result["source"] and result["target"] and result["edge_properties"]:
1798
- edges_dict[(result["source"], result["target"])] = result[
1799
- "edge_properties"
1800
- ]
 
 
 
 
 
 
 
 
 
1801
 
1802
  return edges_dict
1803
 
 
1372
  if record:
1373
  node = record[0]
1374
  node_dict = node["n"]["properties"]
1375
+
1376
+ # Process string result, parse it to JSON dictionary
1377
+ if isinstance(node_dict, str):
1378
+ try:
1379
+ import json
1380
+ node_dict = json.loads(node_dict)
1381
+ except json.JSONDecodeError:
1382
+ logger.warning(f"Failed to parse node string: {node_dict}")
1383
 
1384
  return node_dict
1385
  return None
 
1428
  record = await self._query(query)
1429
  if record and record[0] and record[0]["edge_properties"]:
1430
  result = record[0]["edge_properties"]
1431
+
1432
+ # Process string result, parse it to JSON dictionary
1433
+ if isinstance(result, str):
1434
+ try:
1435
+ import json
1436
+ result = json.loads(result)
1437
+ except json.JSONDecodeError:
1438
+ logger.warning(f"Failed to parse edge string: {result}")
1439
+
1440
  return result
1441
 
1442
  async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
 
1448
 
1449
  query = """SELECT * FROM cypher('%s', $$
1450
  MATCH (n:base {entity_id: "%s"})
1451
+ OPTIONAL MATCH (n)-[]-(connected:base)
1452
+ RETURN n.entity_id AS source_id, connected.entity_id AS connected_id
1453
+ $$) AS (source_id text, connected_id text)""" % (
1454
  self.graph_name,
1455
  label,
1456
  )
 
1458
  results = await self._query(query)
1459
  edges = []
1460
  for record in results:
1461
+ source_id = record["source_id"]
1462
+ connected_id = record["connected_id"]
1463
 
1464
+ if source_id and connected_id:
1465
+ edges.append((source_id, connected_id))
 
 
 
 
 
 
 
 
 
1466
 
1467
  return edges
1468
 
 
1645
  for result in results:
1646
  if result["node_id"] and result["n"]:
1647
  node_dict = result["n"]["properties"]
1648
+
1649
+ # Process string result, parse it to JSON dictionary
1650
+ if isinstance(node_dict, str):
1651
+ try:
1652
+ import json
1653
+ node_dict = json.loads(node_dict)
1654
+ except json.JSONDecodeError:
1655
+ logger.warning(f"Failed to parse node string in batch: {node_dict}")
1656
+
1657
  # Remove the 'base' label if present in a 'labels' property
1658
  if "labels" in node_dict:
1659
  node_dict["labels"] = [
 
1805
 
1806
  for result in forward_results:
1807
  if result["source"] and result["target"] and result["edge_properties"]:
1808
+ edge_props = result["edge_properties"]
1809
+
1810
+ # Process string result, parse it to JSON dictionary
1811
+ if isinstance(edge_props, str):
1812
+ try:
1813
+ import json
1814
+ edge_props = json.loads(edge_props)
1815
+ except json.JSONDecodeError:
1816
+ logger.warning(f"Failed to parse edge properties string: {edge_props}")
1817
+ continue
1818
+
1819
+ edges_dict[(result["source"], result["target"])] = edge_props
1820
 
1821
  for result in backward_results:
1822
  if result["source"] and result["target"] and result["edge_properties"]:
1823
+ edge_props = result["edge_properties"]
1824
+
1825
+ # Process string result, parse it to JSON dictionary
1826
+ if isinstance(edge_props, str):
1827
+ try:
1828
+ import json
1829
+ edge_props = json.loads(edge_props)
1830
+ except json.JSONDecodeError:
1831
+ logger.warning(f"Failed to parse edge properties string: {edge_props}")
1832
+ continue
1833
+
1834
+ edges_dict[(result["source"], result["target"])] = edge_props
1835
 
1836
  return edges_dict
1837