yangdx commited on
Commit
5b87097
·
1 Parent(s): 422dbaf

Add process ID to log messages for better multi-process debugging clarity

Browse files

- Add PID to KV and Neo4j storage logs
- Add PID to query context logs
- Improve KV data count logging for llm cache

lightrag/kg/json_doc_status_impl.py CHANGED
@@ -40,7 +40,7 @@ class JsonDocStatusStorage(DocStatusStorage):
40
  async with self._storage_lock:
41
  self._data.update(loaded_data)
42
  logger.info(
43
- f"Loaded document status storage with {len(loaded_data)} records"
44
  )
45
 
46
  async def filter_keys(self, keys: set[str]) -> set[str]:
@@ -90,6 +90,7 @@ class JsonDocStatusStorage(DocStatusStorage):
90
  data_dict = (
91
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
92
  )
 
93
  write_json(data_dict, self._file_name)
94
 
95
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
 
40
  async with self._storage_lock:
41
  self._data.update(loaded_data)
42
  logger.info(
43
+ f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records"
44
  )
45
 
46
  async def filter_keys(self, keys: set[str]) -> set[str]:
 
90
  data_dict = (
91
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
92
  )
93
+ logger.info(f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}")
94
  write_json(data_dict, self._file_name)
95
 
96
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
lightrag/kg/json_kv_impl.py CHANGED
@@ -35,13 +35,34 @@ class JsonKVStorage(BaseKVStorage):
35
  loaded_data = load_json(self._file_name) or {}
36
  async with self._storage_lock:
37
  self._data.update(loaded_data)
38
- logger.info(f"Load KV {self.namespace} with {len(loaded_data)} data")
 
 
 
 
 
 
 
 
 
 
39
 
40
  async def index_done_callback(self) -> None:
41
  async with self._storage_lock:
42
  data_dict = (
43
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
44
  )
 
 
 
 
 
 
 
 
 
 
 
45
  write_json(data_dict, self._file_name)
46
 
47
  async def get_all(self) -> dict[str, Any]:
@@ -73,12 +94,13 @@ class JsonKVStorage(BaseKVStorage):
73
  return set(keys) - set(self._data.keys())
74
 
75
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
76
- logger.info(f"Inserting {len(data)} to {self.namespace}")
77
  if not data:
78
  return
79
  async with self._storage_lock:
80
  left_data = {k: v for k, v in data.items() if k not in self._data}
81
- self._data.update(left_data)
 
 
82
 
83
  async def delete(self, ids: list[str]) -> None:
84
  async with self._storage_lock:
 
35
  loaded_data = load_json(self._file_name) or {}
36
  async with self._storage_lock:
37
  self._data.update(loaded_data)
38
+
39
+ # Calculate data count based on namespace
40
+ if self.namespace.endswith("cache"):
41
+ # For cache namespaces, sum the cache entries across all cache types
42
+ data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values()
43
+ if isinstance(first_level_dict, dict))
44
+ else:
45
+ # For non-cache namespaces, use the original count method
46
+ data_count = len(loaded_data)
47
+
48
+ logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records")
49
 
50
  async def index_done_callback(self) -> None:
51
  async with self._storage_lock:
52
  data_dict = (
53
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
54
  )
55
+
56
+ # Calculate data count based on namespace
57
+ if self.namespace.endswith("cache"):
58
+ # # For cache namespaces, sum the cache entries across all cache types
59
+ data_count = sum(len(first_level_dict) for first_level_dict in data_dict.values()
60
+ if isinstance(first_level_dict, dict))
61
+ else:
62
+ # For non-cache namespaces, use the original count method
63
+ data_count = len(data_dict)
64
+
65
+ logger.info(f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}")
66
  write_json(data_dict, self._file_name)
67
 
68
  async def get_all(self) -> dict[str, Any]:
 
94
  return set(keys) - set(self._data.keys())
95
 
96
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
 
97
  if not data:
98
  return
99
  async with self._storage_lock:
100
  left_data = {k: v for k, v in data.items() if k not in self._data}
101
+ if left_data:
102
+ logger.info(f"Process {os.getpid()} KV inserting {len(left_data)} to {self.namespace}")
103
+ self._data.update(left_data)
104
 
105
  async def delete(self, ids: list[str]) -> None:
106
  async with self._storage_lock:
lightrag/kg/neo4j_impl.py CHANGED
@@ -842,7 +842,7 @@ class Neo4JStorage(BaseGraphStorage):
842
  seen_edges.add(edge_id)
843
 
844
  logger.info(
845
- f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
846
  )
847
  finally:
848
  await result_set.consume() # Ensure result set is consumed
 
842
  seen_edges.add(edge_id)
843
 
844
  logger.info(
845
+ f"Process {os.getpid()} graph query return: {len(result.nodes)} nodes, {len(result.edges)} edges"
846
  )
847
  finally:
848
  await result_set.consume() # Ensure result set is consumed
lightrag/operate.py CHANGED
@@ -3,6 +3,7 @@ from __future__ import annotations
3
  import asyncio
4
  import json
5
  import re
 
6
  from typing import Any, AsyncIterator
7
  from collections import Counter, defaultdict
8
 
@@ -1027,6 +1028,7 @@ async def _build_query_context(
1027
  text_chunks_db: BaseKVStorage,
1028
  query_param: QueryParam,
1029
  ):
 
1030
  if query_param.mode == "local":
1031
  entities_context, relations_context, text_units_context = await _get_node_data(
1032
  ll_keywords,
 
3
  import asyncio
4
  import json
5
  import re
6
+ import os
7
  from typing import Any, AsyncIterator
8
  from collections import Counter, defaultdict
9
 
 
1028
  text_chunks_db: BaseKVStorage,
1029
  query_param: QueryParam,
1030
  ):
1031
+ logger.info(f"Process {os.getpid()} buidling query context...")
1032
  if query_param.mode == "local":
1033
  entities_context, relations_context, text_units_context = await _get_node_data(
1034
  ll_keywords,