yangdx commited on
Commit
49a0819
·
1 Parent(s): 22d3f51

Add graph DB lock to shared storage system

Browse files

• Introduced new graph_db_lock
• Added detailed lock debugging output

Files changed (1) hide show
  1. lightrag/kg/shared_storage.py +69 -25
lightrag/kg/shared_storage.py CHANGED
@@ -7,12 +7,18 @@ from typing import Any, Dict, Optional, Union, TypeVar, Generic
7
 
8
 
9
  # Define a direct print function for critical logs that must be visible in all processes
10
- def direct_log(message, level="INFO"):
11
  """
12
  Log a message directly to stderr to ensure visibility in all processes,
13
  including the Gunicorn master process.
 
 
 
 
 
14
  """
15
- print(f"{level}: {message}", file=sys.stderr, flush=True)
 
16
 
17
 
18
  T = TypeVar("T")
@@ -32,55 +38,88 @@ _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
32
  _storage_lock: Optional[LockType] = None
33
  _internal_lock: Optional[LockType] = None
34
  _pipeline_status_lock: Optional[LockType] = None
 
35
 
36
 
37
  class UnifiedLock(Generic[T]):
38
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
39
 
40
- def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool):
41
  self._lock = lock
42
  self._is_async = is_async
 
 
 
43
 
44
  async def __aenter__(self) -> "UnifiedLock[T]":
45
- if self._is_async:
46
- await self._lock.acquire()
47
- else:
48
- self._lock.acquire()
49
- return self
 
 
 
 
 
 
50
 
51
  async def __aexit__(self, exc_type, exc_val, exc_tb):
52
- if self._is_async:
53
- self._lock.release()
54
- else:
55
- self._lock.release()
 
 
 
 
 
 
56
 
57
  def __enter__(self) -> "UnifiedLock[T]":
58
  """For backward compatibility"""
59
- if self._is_async:
60
- raise RuntimeError("Use 'async with' for shared_storage lock")
61
- self._lock.acquire()
62
- return self
 
 
 
 
 
 
63
 
64
  def __exit__(self, exc_type, exc_val, exc_tb):
65
  """For backward compatibility"""
66
- if self._is_async:
67
- raise RuntimeError("Use 'async with' for shared_storage lock")
68
- self._lock.release()
 
 
 
 
 
 
69
 
70
 
71
- def get_internal_lock() -> UnifiedLock:
72
  """return unified storage lock for data consistency"""
73
- return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess)
74
 
75
 
76
- def get_storage_lock() -> UnifiedLock:
77
  """return unified storage lock for data consistency"""
78
- return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess)
79
 
80
 
81
- def get_pipeline_status_lock() -> UnifiedLock:
82
  """return unified storage lock for data consistency"""
83
- return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess)
 
 
 
 
 
84
 
85
 
86
  def initialize_share_data(workers: int = 1):
@@ -108,6 +147,7 @@ def initialize_share_data(workers: int = 1):
108
  _storage_lock, \
109
  _internal_lock, \
110
  _pipeline_status_lock, \
 
111
  _shared_dicts, \
112
  _init_flags, \
113
  _initialized, \
@@ -128,6 +168,7 @@ def initialize_share_data(workers: int = 1):
128
  _internal_lock = _manager.Lock()
129
  _storage_lock = _manager.Lock()
130
  _pipeline_status_lock = _manager.Lock()
 
131
  _shared_dicts = _manager.dict()
132
  _init_flags = _manager.dict()
133
  _update_flags = _manager.dict()
@@ -139,6 +180,7 @@ def initialize_share_data(workers: int = 1):
139
  _internal_lock = asyncio.Lock()
140
  _storage_lock = asyncio.Lock()
141
  _pipeline_status_lock = asyncio.Lock()
 
142
  _shared_dicts = {}
143
  _init_flags = {}
144
  _update_flags = {}
@@ -304,6 +346,7 @@ def finalize_share_data():
304
  _storage_lock, \
305
  _internal_lock, \
306
  _pipeline_status_lock, \
 
307
  _shared_dicts, \
308
  _init_flags, \
309
  _initialized, \
@@ -369,6 +412,7 @@ def finalize_share_data():
369
  _storage_lock = None
370
  _internal_lock = None
371
  _pipeline_status_lock = None
 
372
  _update_flags = None
373
 
374
  direct_log(f"Process {os.getpid()} storage data finalization complete")
 
7
 
8
 
9
  # Define a direct print function for critical logs that must be visible in all processes
10
+ def direct_log(message, level="INFO", enable_output: bool = True):
11
  """
12
  Log a message directly to stderr to ensure visibility in all processes,
13
  including the Gunicorn master process.
14
+
15
+ Args:
16
+ message: The message to log
17
+ level: Log level (default: "INFO")
18
+ enable_output: Whether to actually output the log (default: True)
19
  """
20
+ if enable_output:
21
+ print(f"{level}: {message}", file=sys.stderr, flush=True)
22
 
23
 
24
  T = TypeVar("T")
 
38
  _storage_lock: Optional[LockType] = None
39
  _internal_lock: Optional[LockType] = None
40
  _pipeline_status_lock: Optional[LockType] = None
41
+ _graph_db_lock: Optional[LockType] = None
42
 
43
 
44
  class UnifiedLock(Generic[T]):
45
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
46
 
47
+ def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool, name: str = "unnamed", enable_logging: bool = True):
48
  self._lock = lock
49
  self._is_async = is_async
50
+ self._pid = os.getpid() # for debug only
51
+ self._name = name # for debug only
52
+ self._enable_logging = enable_logging # for debug only
53
 
54
  async def __aenter__(self) -> "UnifiedLock[T]":
55
+ try:
56
+ direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging)
57
+ if self._is_async:
58
+ await self._lock.acquire()
59
+ else:
60
+ self._lock.acquire()
61
+ direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging)
62
+ return self
63
+ except Exception as e:
64
+ direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging)
65
+ raise
66
 
67
  async def __aexit__(self, exc_type, exc_val, exc_tb):
68
+ try:
69
+ direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging)
70
+ if self._is_async:
71
+ self._lock.release()
72
+ else:
73
+ self._lock.release()
74
+ direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging)
75
+ except Exception as e:
76
+ direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging)
77
+ raise
78
 
79
  def __enter__(self) -> "UnifiedLock[T]":
80
  """For backward compatibility"""
81
+ try:
82
+ if self._is_async:
83
+ raise RuntimeError("Use 'async with' for shared_storage lock")
84
+ direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)", enable_output=self._enable_logging)
85
+ self._lock.acquire()
86
+ direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)", enable_output=self._enable_logging)
87
+ return self
88
+ except Exception as e:
89
+ direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging)
90
+ raise
91
 
92
  def __exit__(self, exc_type, exc_val, exc_tb):
93
  """For backward compatibility"""
94
+ try:
95
+ if self._is_async:
96
+ raise RuntimeError("Use 'async with' for shared_storage lock")
97
+ direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)", enable_output=self._enable_logging)
98
+ self._lock.release()
99
+ direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)", enable_output=self._enable_logging)
100
+ except Exception as e:
101
+ direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging)
102
+ raise
103
 
104
 
105
+ def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
106
  """return unified storage lock for data consistency"""
107
+ return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess, name="internal_lock", enable_logging=enable_logging)
108
 
109
 
110
+ def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
111
  """return unified storage lock for data consistency"""
112
+ return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess, name="storage_lock", enable_logging=enable_logging)
113
 
114
 
115
+ def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
116
  """return unified storage lock for data consistency"""
117
+ return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess, name="pipeline_status_lock", enable_logging=enable_logging)
118
+
119
+
120
+ def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
121
+ """return unified graph database lock for ensuring atomic operations"""
122
+ return UnifiedLock(lock=_graph_db_lock, is_async=not is_multiprocess, name="graph_db_lock", enable_logging=enable_logging)
123
 
124
 
125
  def initialize_share_data(workers: int = 1):
 
147
  _storage_lock, \
148
  _internal_lock, \
149
  _pipeline_status_lock, \
150
+ _graph_db_lock, \
151
  _shared_dicts, \
152
  _init_flags, \
153
  _initialized, \
 
168
  _internal_lock = _manager.Lock()
169
  _storage_lock = _manager.Lock()
170
  _pipeline_status_lock = _manager.Lock()
171
+ _graph_db_lock = _manager.Lock()
172
  _shared_dicts = _manager.dict()
173
  _init_flags = _manager.dict()
174
  _update_flags = _manager.dict()
 
180
  _internal_lock = asyncio.Lock()
181
  _storage_lock = asyncio.Lock()
182
  _pipeline_status_lock = asyncio.Lock()
183
+ _graph_db_lock = asyncio.Lock()
184
  _shared_dicts = {}
185
  _init_flags = {}
186
  _update_flags = {}
 
346
  _storage_lock, \
347
  _internal_lock, \
348
  _pipeline_status_lock, \
349
+ _graph_db_lock, \
350
  _shared_dicts, \
351
  _init_flags, \
352
  _initialized, \
 
412
  _storage_lock = None
413
  _internal_lock = None
414
  _pipeline_status_lock = None
415
+ _graph_db_lock = None
416
  _update_flags = None
417
 
418
  direct_log(f"Process {os.getpid()} storage data finalization complete")