yangdx commited on
Commit
c6a5b86
·
1 Parent(s): 1c21f94

feat(shared_storage): prevent event loop blocking in multiprocess mode

Browse files

Add auxiliary async locks in multiprocess mode to prevent event loop blocking

Files changed (1) hide show
  1. lightrag/kg/shared_storage.py +85 -2
lightrag/kg/shared_storage.py CHANGED
@@ -41,6 +41,9 @@ _pipeline_status_lock: Optional[LockType] = None
41
  _graph_db_lock: Optional[LockType] = None
42
  _data_init_lock: Optional[LockType] = None
43
 
 
 
 
44
 
45
  class UnifiedLock(Generic[T]):
46
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
@@ -51,12 +54,14 @@ class UnifiedLock(Generic[T]):
51
  is_async: bool,
52
  name: str = "unnamed",
53
  enable_logging: bool = True,
 
54
  ):
55
  self._lock = lock
56
  self._is_async = is_async
57
  self._pid = os.getpid() # for debug only
58
  self._name = name # for debug only
59
  self._enable_logging = enable_logging # for debug only
 
60
 
61
  async def __aenter__(self) -> "UnifiedLock[T]":
62
  try:
@@ -64,16 +69,35 @@ class UnifiedLock(Generic[T]):
64
  f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
65
  enable_output=self._enable_logging,
66
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  if self._is_async:
68
  await self._lock.acquire()
69
  else:
70
  self._lock.acquire()
 
71
  direct_log(
72
  f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
73
  enable_output=self._enable_logging,
74
  )
75
  return self
76
  except Exception as e:
 
 
 
 
77
  direct_log(
78
  f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
79
  level="ERROR",
@@ -82,15 +106,29 @@ class UnifiedLock(Generic[T]):
82
  raise
83
 
84
  async def __aexit__(self, exc_type, exc_val, exc_tb):
 
85
  try:
86
  direct_log(
87
  f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
88
  enable_output=self._enable_logging,
89
  )
 
 
90
  if self._is_async:
91
  self._lock.release()
92
  else:
93
  self._lock.release()
 
 
 
 
 
 
 
 
 
 
 
94
  direct_log(
95
  f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
96
  enable_output=self._enable_logging,
@@ -101,6 +139,27 @@ class UnifiedLock(Generic[T]):
101
  level="ERROR",
102
  enable_output=self._enable_logging,
103
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  raise
105
 
106
  def __enter__(self) -> "UnifiedLock[T]":
@@ -151,51 +210,61 @@ class UnifiedLock(Generic[T]):
151
 
152
  def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
153
  """return unified storage lock for data consistency"""
 
154
  return UnifiedLock(
155
  lock=_internal_lock,
156
  is_async=not is_multiprocess,
157
  name="internal_lock",
158
  enable_logging=enable_logging,
 
159
  )
160
 
161
 
162
  def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
163
  """return unified storage lock for data consistency"""
 
164
  return UnifiedLock(
165
  lock=_storage_lock,
166
  is_async=not is_multiprocess,
167
  name="storage_lock",
168
  enable_logging=enable_logging,
 
169
  )
170
 
171
 
172
  def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
173
  """return unified storage lock for data consistency"""
 
174
  return UnifiedLock(
175
  lock=_pipeline_status_lock,
176
  is_async=not is_multiprocess,
177
  name="pipeline_status_lock",
178
  enable_logging=enable_logging,
 
179
  )
180
 
181
 
182
  def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
183
  """return unified graph database lock for ensuring atomic operations"""
 
184
  return UnifiedLock(
185
  lock=_graph_db_lock,
186
  is_async=not is_multiprocess,
187
  name="graph_db_lock",
188
  enable_logging=enable_logging,
 
189
  )
190
 
191
 
192
  def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
193
  """return unified data initialization lock for ensuring atomic data initialization"""
 
194
  return UnifiedLock(
195
  lock=_data_init_lock,
196
  is_async=not is_multiprocess,
197
  name="data_init_lock",
198
  enable_logging=enable_logging,
 
199
  )
200
 
201
 
@@ -229,7 +298,8 @@ def initialize_share_data(workers: int = 1):
229
  _shared_dicts, \
230
  _init_flags, \
231
  _initialized, \
232
- _update_flags
 
233
 
234
  # Check if already initialized
235
  if _initialized:
@@ -251,6 +321,16 @@ def initialize_share_data(workers: int = 1):
251
  _shared_dicts = _manager.dict()
252
  _init_flags = _manager.dict()
253
  _update_flags = _manager.dict()
 
 
 
 
 
 
 
 
 
 
254
  direct_log(
255
  f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
256
  )
@@ -264,6 +344,7 @@ def initialize_share_data(workers: int = 1):
264
  _shared_dicts = {}
265
  _init_flags = {}
266
  _update_flags = {}
 
267
  direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
268
 
269
  # Mark as initialized
@@ -458,7 +539,8 @@ def finalize_share_data():
458
  _shared_dicts, \
459
  _init_flags, \
460
  _initialized, \
461
- _update_flags
 
462
 
463
  # Check if already initialized
464
  if not _initialized:
@@ -523,5 +605,6 @@ def finalize_share_data():
523
  _graph_db_lock = None
524
  _data_init_lock = None
525
  _update_flags = None
 
526
 
527
  direct_log(f"Process {os.getpid()} storage data finalization complete")
 
41
  _graph_db_lock: Optional[LockType] = None
42
  _data_init_lock: Optional[LockType] = None
43
 
44
+ # async locks for coroutine synchronization in multiprocess mode
45
+ _async_locks: Optional[Dict[str, asyncio.Lock]] = None
46
+
47
 
48
  class UnifiedLock(Generic[T]):
49
  """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
 
54
  is_async: bool,
55
  name: str = "unnamed",
56
  enable_logging: bool = True,
57
+ async_lock: Optional[asyncio.Lock] = None,
58
  ):
59
  self._lock = lock
60
  self._is_async = is_async
61
  self._pid = os.getpid() # for debug only
62
  self._name = name # for debug only
63
  self._enable_logging = enable_logging # for debug only
64
+ self._async_lock = async_lock # auxiliary lock for coroutine synchronization
65
 
66
  async def __aenter__(self) -> "UnifiedLock[T]":
67
  try:
 
69
  f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
70
  enable_output=self._enable_logging,
71
  )
72
+
73
+ # If in multiprocess mode and async lock exists, acquire it first
74
+ if not self._is_async and self._async_lock is not None:
75
+ direct_log(
76
+ f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'",
77
+ enable_output=self._enable_logging,
78
+ )
79
+ await self._async_lock.acquire()
80
+ direct_log(
81
+ f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
82
+ enable_output=self._enable_logging,
83
+ )
84
+
85
+ # Then acquire the main lock
86
  if self._is_async:
87
  await self._lock.acquire()
88
  else:
89
  self._lock.acquire()
90
+
91
  direct_log(
92
  f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
93
  enable_output=self._enable_logging,
94
  )
95
  return self
96
  except Exception as e:
97
+ # If main lock acquisition fails, release the async lock if it was acquired
98
+ if not self._is_async and self._async_lock is not None and self._async_lock.locked():
99
+ self._async_lock.release()
100
+
101
  direct_log(
102
  f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
103
  level="ERROR",
 
106
  raise
107
 
108
  async def __aexit__(self, exc_type, exc_val, exc_tb):
109
+ main_lock_released = False
110
  try:
111
  direct_log(
112
  f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
113
  enable_output=self._enable_logging,
114
  )
115
+
116
+ # Release main lock first
117
  if self._is_async:
118
  self._lock.release()
119
  else:
120
  self._lock.release()
121
+
122
+ main_lock_released = True
123
+
124
+ # Then release async lock if in multiprocess mode
125
+ if not self._is_async and self._async_lock is not None:
126
+ direct_log(
127
+ f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'",
128
+ enable_output=self._enable_logging,
129
+ )
130
+ self._async_lock.release()
131
+
132
  direct_log(
133
  f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
134
  enable_output=self._enable_logging,
 
139
  level="ERROR",
140
  enable_output=self._enable_logging,
141
  )
142
+
143
+ # If main lock release failed but async lock hasn't been released, try to release it
144
+ if not main_lock_released and not self._is_async and self._async_lock is not None:
145
+ try:
146
+ direct_log(
147
+ f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
148
+ level="WARNING",
149
+ enable_output=self._enable_logging,
150
+ )
151
+ self._async_lock.release()
152
+ direct_log(
153
+ f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
154
+ enable_output=self._enable_logging,
155
+ )
156
+ except Exception as inner_e:
157
+ direct_log(
158
+ f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
159
+ level="ERROR",
160
+ enable_output=self._enable_logging,
161
+ )
162
+
163
  raise
164
 
165
  def __enter__(self) -> "UnifiedLock[T]":
 
210
 
211
  def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
212
  """return unified storage lock for data consistency"""
213
+ async_lock = _async_locks.get("internal_lock") if is_multiprocess else None
214
  return UnifiedLock(
215
  lock=_internal_lock,
216
  is_async=not is_multiprocess,
217
  name="internal_lock",
218
  enable_logging=enable_logging,
219
+ async_lock=async_lock,
220
  )
221
 
222
 
223
  def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
224
  """return unified storage lock for data consistency"""
225
+ async_lock = _async_locks.get("storage_lock") if is_multiprocess else None
226
  return UnifiedLock(
227
  lock=_storage_lock,
228
  is_async=not is_multiprocess,
229
  name="storage_lock",
230
  enable_logging=enable_logging,
231
+ async_lock=async_lock,
232
  )
233
 
234
 
235
  def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
236
  """return unified storage lock for data consistency"""
237
+ async_lock = _async_locks.get("pipeline_status_lock") if is_multiprocess else None
238
  return UnifiedLock(
239
  lock=_pipeline_status_lock,
240
  is_async=not is_multiprocess,
241
  name="pipeline_status_lock",
242
  enable_logging=enable_logging,
243
+ async_lock=async_lock,
244
  )
245
 
246
 
247
  def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
248
  """return unified graph database lock for ensuring atomic operations"""
249
+ async_lock = _async_locks.get("graph_db_lock") if is_multiprocess else None
250
  return UnifiedLock(
251
  lock=_graph_db_lock,
252
  is_async=not is_multiprocess,
253
  name="graph_db_lock",
254
  enable_logging=enable_logging,
255
+ async_lock=async_lock,
256
  )
257
 
258
 
259
  def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
260
  """return unified data initialization lock for ensuring atomic data initialization"""
261
+ async_lock = _async_locks.get("data_init_lock") if is_multiprocess else None
262
  return UnifiedLock(
263
  lock=_data_init_lock,
264
  is_async=not is_multiprocess,
265
  name="data_init_lock",
266
  enable_logging=enable_logging,
267
+ async_lock=async_lock,
268
  )
269
 
270
 
 
298
  _shared_dicts, \
299
  _init_flags, \
300
  _initialized, \
301
+ _update_flags, \
302
+ _async_locks
303
 
304
  # Check if already initialized
305
  if _initialized:
 
321
  _shared_dicts = _manager.dict()
322
  _init_flags = _manager.dict()
323
  _update_flags = _manager.dict()
324
+
325
+ # Initialize async locks for multiprocess mode
326
+ _async_locks = {
327
+ "internal_lock": asyncio.Lock(),
328
+ "storage_lock": asyncio.Lock(),
329
+ "pipeline_status_lock": asyncio.Lock(),
330
+ "graph_db_lock": asyncio.Lock(),
331
+ "data_init_lock": asyncio.Lock(),
332
+ }
333
+
334
  direct_log(
335
  f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
336
  )
 
344
  _shared_dicts = {}
345
  _init_flags = {}
346
  _update_flags = {}
347
+ _async_locks = None # No need for async locks in single process mode
348
  direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
349
 
350
  # Mark as initialized
 
539
  _shared_dicts, \
540
  _init_flags, \
541
  _initialized, \
542
+ _update_flags, \
543
+ _async_locks
544
 
545
  # Check if already initialized
546
  if not _initialized:
 
605
  _graph_db_lock = None
606
  _data_init_lock = None
607
  _update_flags = None
608
+ _async_locks = None
609
 
610
  direct_log(f"Process {os.getpid()} storage data finalization complete")