yangdx commited on
Commit
804cf52
·
1 Parent(s): 99ece0e

Fix data persistence issue in single-process mode

Browse files

In single-process mode, data updates and persistence were not working properly because the update flags were not being correctly handled between different objects.

lightrag/kg/json_doc_status_impl.py CHANGED
@@ -20,7 +20,6 @@ from .shared_storage import (
20
  set_all_update_flags,
21
  clear_all_update_flags,
22
  try_initialize_namespace,
23
- is_multiprocess,
24
  )
25
 
26
 
@@ -96,9 +95,7 @@ class JsonDocStatusStorage(DocStatusStorage):
96
 
97
  async def index_done_callback(self) -> None:
98
  async with self._storage_lock:
99
- if (is_multiprocess and self.storage_updated.value) or (
100
- not is_multiprocess and self.storage_updated
101
- ):
102
  data_dict = (
103
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
104
  )
 
20
  set_all_update_flags,
21
  clear_all_update_flags,
22
  try_initialize_namespace,
 
23
  )
24
 
25
 
 
95
 
96
  async def index_done_callback(self) -> None:
97
  async with self._storage_lock:
98
+ if self.storage_updated.value:
 
 
99
  data_dict = (
100
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
101
  )
lightrag/kg/json_kv_impl.py CHANGED
@@ -18,7 +18,6 @@ from .shared_storage import (
18
  set_all_update_flags,
19
  clear_all_update_flags,
20
  try_initialize_namespace,
21
- is_multiprocess,
22
  )
23
 
24
 
@@ -63,9 +62,7 @@ class JsonKVStorage(BaseKVStorage):
63
 
64
  async def index_done_callback(self) -> None:
65
  async with self._storage_lock:
66
- if (is_multiprocess and self.storage_updated.value) or (
67
- not is_multiprocess and self.storage_updated
68
- ):
69
  data_dict = (
70
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
71
  )
 
18
  set_all_update_flags,
19
  clear_all_update_flags,
20
  try_initialize_namespace,
 
21
  )
22
 
23
 
 
62
 
63
  async def index_done_callback(self) -> None:
64
  async with self._storage_lock:
65
+ if self.storage_updated.value:
 
 
66
  data_dict = (
67
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
68
  )
lightrag/kg/shared_storage.py CHANGED
@@ -322,7 +322,12 @@ async def get_update_flag(namespace: str):
322
  if is_multiprocess and _manager is not None:
323
  new_update_flag = _manager.Value("b", False)
324
  else:
325
- new_update_flag = False
 
 
 
 
 
326
 
327
  _update_flags[namespace].append(new_update_flag)
328
  return new_update_flag
@@ -342,7 +347,8 @@ async def set_all_update_flags(namespace: str):
342
  if is_multiprocess:
343
  _update_flags[namespace][i].value = True
344
  else:
345
- _update_flags[namespace][i] = True
 
346
 
347
 
348
  async def clear_all_update_flags(namespace: str):
@@ -359,7 +365,8 @@ async def clear_all_update_flags(namespace: str):
359
  if is_multiprocess:
360
  _update_flags[namespace][i].value = False
361
  else:
362
- _update_flags[namespace][i] = False
 
363
 
364
 
365
  async def get_all_update_flags_status() -> Dict[str, list]:
 
322
  if is_multiprocess and _manager is not None:
323
  new_update_flag = _manager.Value("b", False)
324
  else:
325
+ # Create a simple mutable object to store boolean value for compatibility with mutiprocess
326
+ class MutableBoolean:
327
+ def __init__(self, initial_value=False):
328
+ self.value = initial_value
329
+
330
+ new_update_flag = MutableBoolean(False)
331
 
332
  _update_flags[namespace].append(new_update_flag)
333
  return new_update_flag
 
347
  if is_multiprocess:
348
  _update_flags[namespace][i].value = True
349
  else:
350
+ # Use .value attribute instead of direct assignment
351
+ _update_flags[namespace][i].value = True
352
 
353
 
354
  async def clear_all_update_flags(namespace: str):
 
365
  if is_multiprocess:
366
  _update_flags[namespace][i].value = False
367
  else:
368
+ # Use .value attribute instead of direct assignment
369
+ _update_flags[namespace][i].value = False
370
 
371
 
372
  async def get_all_update_flags_status() -> Dict[str, list]: