yangdx
commited on
Commit
·
9aa18ad
1
Parent(s):
3a23e78
Fix share storage update status handling problem of in memeory storage
Browse files- lightrag/kg/faiss_impl.py +4 -13
- lightrag/kg/nano_vector_db_impl.py +5 -14
- lightrag/kg/networkx_impl.py +4 -13
- lightrag/kg/shared_storage.py +26 -34
lightrag/kg/faiss_impl.py
CHANGED
@@ -19,7 +19,6 @@ from .shared_storage import (
|
|
19 |
get_storage_lock,
|
20 |
get_update_flag,
|
21 |
set_all_update_flags,
|
22 |
-
is_multiprocess,
|
23 |
)
|
24 |
|
25 |
|
@@ -73,9 +72,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
73 |
# Acquire lock to prevent concurrent read and write
|
74 |
async with self._storage_lock:
|
75 |
# Check if storage was updated by another process
|
76 |
-
if
|
77 |
-
not is_multiprocess and self.storage_updated
|
78 |
-
):
|
79 |
logger.info(
|
80 |
f"Process {os.getpid()} FAISS reloading {self.namespace} due to update by another process"
|
81 |
)
|
@@ -83,10 +80,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
83 |
self._index = faiss.IndexFlatIP(self._dim)
|
84 |
self._id_to_meta = {}
|
85 |
self._load_faiss_index()
|
86 |
-
|
87 |
-
self.storage_updated.value = False
|
88 |
-
else:
|
89 |
-
self.storage_updated = False
|
90 |
return self._index
|
91 |
|
92 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
@@ -345,7 +339,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
345 |
async def index_done_callback(self) -> None:
|
346 |
async with self._storage_lock:
|
347 |
# Check if storage was updated by another process
|
348 |
-
if
|
349 |
# Storage was updated by another process, reload data instead of saving
|
350 |
logger.warning(
|
351 |
f"Storage for FAISS {self.namespace} was updated by another process, reloading..."
|
@@ -365,10 +359,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
365 |
# Notify other processes that data has been updated
|
366 |
await set_all_update_flags(self.namespace)
|
367 |
# Reset own update flag to avoid self-reloading
|
368 |
-
|
369 |
-
self.storage_updated.value = False
|
370 |
-
else:
|
371 |
-
self.storage_updated = False
|
372 |
except Exception as e:
|
373 |
logger.error(f"Error saving FAISS index for {self.namespace}: {e}")
|
374 |
return False # Return error
|
|
|
19 |
get_storage_lock,
|
20 |
get_update_flag,
|
21 |
set_all_update_flags,
|
|
|
22 |
)
|
23 |
|
24 |
|
|
|
72 |
# Acquire lock to prevent concurrent read and write
|
73 |
async with self._storage_lock:
|
74 |
# Check if storage was updated by another process
|
75 |
+
if self.storage_updated.value:
|
|
|
|
|
76 |
logger.info(
|
77 |
f"Process {os.getpid()} FAISS reloading {self.namespace} due to update by another process"
|
78 |
)
|
|
|
80 |
self._index = faiss.IndexFlatIP(self._dim)
|
81 |
self._id_to_meta = {}
|
82 |
self._load_faiss_index()
|
83 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
84 |
return self._index
|
85 |
|
86 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
|
|
339 |
async def index_done_callback(self) -> None:
|
340 |
async with self._storage_lock:
|
341 |
# Check if storage was updated by another process
|
342 |
+
if self.storage_updated.value:
|
343 |
# Storage was updated by another process, reload data instead of saving
|
344 |
logger.warning(
|
345 |
f"Storage for FAISS {self.namespace} was updated by another process, reloading..."
|
|
|
359 |
# Notify other processes that data has been updated
|
360 |
await set_all_update_flags(self.namespace)
|
361 |
# Reset own update flag to avoid self-reloading
|
362 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
363 |
except Exception as e:
|
364 |
logger.error(f"Error saving FAISS index for {self.namespace}: {e}")
|
365 |
return False # Return error
|
lightrag/kg/nano_vector_db_impl.py
CHANGED
@@ -20,7 +20,6 @@ from .shared_storage import (
|
|
20 |
get_storage_lock,
|
21 |
get_update_flag,
|
22 |
set_all_update_flags,
|
23 |
-
is_multiprocess,
|
24 |
)
|
25 |
|
26 |
|
@@ -57,16 +56,14 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
57 |
# Get the update flag for cross-process update notification
|
58 |
self.storage_updated = await get_update_flag(self.namespace)
|
59 |
# Get the storage lock for use in other methods
|
60 |
-
self._storage_lock = get_storage_lock()
|
61 |
|
62 |
async def _get_client(self):
|
63 |
"""Check if the storage should be reloaded"""
|
64 |
# Acquire lock to prevent concurrent read and write
|
65 |
async with self._storage_lock:
|
66 |
# Check if data needs to be reloaded
|
67 |
-
if
|
68 |
-
not is_multiprocess and self.storage_updated
|
69 |
-
):
|
70 |
logger.info(
|
71 |
f"Process {os.getpid()} reloading {self.namespace} due to update by another process"
|
72 |
)
|
@@ -76,10 +73,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
76 |
storage_file=self._client_file_name,
|
77 |
)
|
78 |
# Reset update flag
|
79 |
-
|
80 |
-
self.storage_updated.value = False
|
81 |
-
else:
|
82 |
-
self.storage_updated = False
|
83 |
|
84 |
return self._client
|
85 |
|
@@ -208,7 +202,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
208 |
"""Save data to disk"""
|
209 |
async with self._storage_lock:
|
210 |
# Check if storage was updated by another process
|
211 |
-
if
|
212 |
# Storage was updated by another process, reload data instead of saving
|
213 |
logger.warning(
|
214 |
f"Storage for {self.namespace} was updated by another process, reloading..."
|
@@ -229,10 +223,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
229 |
# Notify other processes that data has been updated
|
230 |
await set_all_update_flags(self.namespace)
|
231 |
# Reset own update flag to avoid self-reloading
|
232 |
-
|
233 |
-
self.storage_updated.value = False
|
234 |
-
else:
|
235 |
-
self.storage_updated = False
|
236 |
return True # Return success
|
237 |
except Exception as e:
|
238 |
logger.error(f"Error saving data for {self.namespace}: {e}")
|
|
|
20 |
get_storage_lock,
|
21 |
get_update_flag,
|
22 |
set_all_update_flags,
|
|
|
23 |
)
|
24 |
|
25 |
|
|
|
56 |
# Get the update flag for cross-process update notification
|
57 |
self.storage_updated = await get_update_flag(self.namespace)
|
58 |
# Get the storage lock for use in other methods
|
59 |
+
self._storage_lock = get_storage_lock(enable_logging=False)
|
60 |
|
61 |
async def _get_client(self):
|
62 |
"""Check if the storage should be reloaded"""
|
63 |
# Acquire lock to prevent concurrent read and write
|
64 |
async with self._storage_lock:
|
65 |
# Check if data needs to be reloaded
|
66 |
+
if self.storage_updated.value:
|
|
|
|
|
67 |
logger.info(
|
68 |
f"Process {os.getpid()} reloading {self.namespace} due to update by another process"
|
69 |
)
|
|
|
73 |
storage_file=self._client_file_name,
|
74 |
)
|
75 |
# Reset update flag
|
76 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
77 |
|
78 |
return self._client
|
79 |
|
|
|
202 |
"""Save data to disk"""
|
203 |
async with self._storage_lock:
|
204 |
# Check if storage was updated by another process
|
205 |
+
if self.storage_updated.value:
|
206 |
# Storage was updated by another process, reload data instead of saving
|
207 |
logger.warning(
|
208 |
f"Storage for {self.namespace} was updated by another process, reloading..."
|
|
|
223 |
# Notify other processes that data has been updated
|
224 |
await set_all_update_flags(self.namespace)
|
225 |
# Reset own update flag to avoid self-reloading
|
226 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
227 |
return True # Return success
|
228 |
except Exception as e:
|
229 |
logger.error(f"Error saving data for {self.namespace}: {e}")
|
lightrag/kg/networkx_impl.py
CHANGED
@@ -21,7 +21,6 @@ from .shared_storage import (
|
|
21 |
get_storage_lock,
|
22 |
get_update_flag,
|
23 |
set_all_update_flags,
|
24 |
-
is_multiprocess,
|
25 |
)
|
26 |
|
27 |
MAX_GRAPH_NODES = int(os.getenv("MAX_GRAPH_NODES", 1000))
|
@@ -110,9 +109,7 @@ class NetworkXStorage(BaseGraphStorage):
|
|
110 |
# Acquire lock to prevent concurrent read and write
|
111 |
async with self._storage_lock:
|
112 |
# Check if data needs to be reloaded
|
113 |
-
if
|
114 |
-
not is_multiprocess and self.storage_updated
|
115 |
-
):
|
116 |
logger.info(
|
117 |
f"Process {os.getpid()} reloading graph {self.namespace} due to update by another process"
|
118 |
)
|
@@ -121,10 +118,7 @@ class NetworkXStorage(BaseGraphStorage):
|
|
121 |
NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
|
122 |
)
|
123 |
# Reset update flag
|
124 |
-
|
125 |
-
self.storage_updated.value = False
|
126 |
-
else:
|
127 |
-
self.storage_updated = False
|
128 |
|
129 |
return self._graph
|
130 |
|
@@ -403,7 +397,7 @@ class NetworkXStorage(BaseGraphStorage):
|
|
403 |
"""Save data to disk"""
|
404 |
async with self._storage_lock:
|
405 |
# Check if storage was updated by another process
|
406 |
-
if
|
407 |
# Storage was updated by another process, reload data instead of saving
|
408 |
logger.warning(
|
409 |
f"Graph for {self.namespace} was updated by another process, reloading..."
|
@@ -423,10 +417,7 @@ class NetworkXStorage(BaseGraphStorage):
|
|
423 |
# Notify other processes that data has been updated
|
424 |
await set_all_update_flags(self.namespace)
|
425 |
# Reset own update flag to avoid self-reloading
|
426 |
-
|
427 |
-
self.storage_updated.value = False
|
428 |
-
else:
|
429 |
-
self.storage_updated = False
|
430 |
return True # Return success
|
431 |
except Exception as e:
|
432 |
logger.error(f"Error saving graph for {self.namespace}: {e}")
|
|
|
21 |
get_storage_lock,
|
22 |
get_update_flag,
|
23 |
set_all_update_flags,
|
|
|
24 |
)
|
25 |
|
26 |
MAX_GRAPH_NODES = int(os.getenv("MAX_GRAPH_NODES", 1000))
|
|
|
109 |
# Acquire lock to prevent concurrent read and write
|
110 |
async with self._storage_lock:
|
111 |
# Check if data needs to be reloaded
|
112 |
+
if self.storage_updated.value:
|
|
|
|
|
113 |
logger.info(
|
114 |
f"Process {os.getpid()} reloading graph {self.namespace} due to update by another process"
|
115 |
)
|
|
|
118 |
NetworkXStorage.load_nx_graph(self._graphml_xml_file) or nx.Graph()
|
119 |
)
|
120 |
# Reset update flag
|
121 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
122 |
|
123 |
return self._graph
|
124 |
|
|
|
397 |
"""Save data to disk"""
|
398 |
async with self._storage_lock:
|
399 |
# Check if storage was updated by another process
|
400 |
+
if self.storage_updated.value:
|
401 |
# Storage was updated by another process, reload data instead of saving
|
402 |
logger.warning(
|
403 |
f"Graph for {self.namespace} was updated by another process, reloading..."
|
|
|
417 |
# Notify other processes that data has been updated
|
418 |
await set_all_update_flags(self.namespace)
|
419 |
# Reset own update flag to avoid self-reloading
|
420 |
+
self.storage_updated.value = False
|
|
|
|
|
|
|
421 |
return True # Return success
|
422 |
except Exception as e:
|
423 |
logger.error(f"Error saving graph for {self.namespace}: {e}")
|
lightrag/kg/shared_storage.py
CHANGED
@@ -24,7 +24,7 @@ def direct_log(message, level="INFO", enable_output: bool = True):
|
|
24 |
T = TypeVar("T")
|
25 |
LockType = Union[ProcessLock, asyncio.Lock]
|
26 |
|
27 |
-
|
28 |
_workers = None
|
29 |
_manager = None
|
30 |
_initialized = None
|
@@ -218,10 +218,10 @@ class UnifiedLock(Generic[T]):
|
|
218 |
|
219 |
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
|
220 |
"""return unified storage lock for data consistency"""
|
221 |
-
async_lock = _async_locks.get("internal_lock") if
|
222 |
return UnifiedLock(
|
223 |
lock=_internal_lock,
|
224 |
-
is_async=not
|
225 |
name="internal_lock",
|
226 |
enable_logging=enable_logging,
|
227 |
async_lock=async_lock,
|
@@ -230,10 +230,10 @@ def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
230 |
|
231 |
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
|
232 |
"""return unified storage lock for data consistency"""
|
233 |
-
async_lock = _async_locks.get("storage_lock") if
|
234 |
return UnifiedLock(
|
235 |
lock=_storage_lock,
|
236 |
-
is_async=not
|
237 |
name="storage_lock",
|
238 |
enable_logging=enable_logging,
|
239 |
async_lock=async_lock,
|
@@ -242,10 +242,10 @@ def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
242 |
|
243 |
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
|
244 |
"""return unified storage lock for data consistency"""
|
245 |
-
async_lock = _async_locks.get("pipeline_status_lock") if
|
246 |
return UnifiedLock(
|
247 |
lock=_pipeline_status_lock,
|
248 |
-
is_async=not
|
249 |
name="pipeline_status_lock",
|
250 |
enable_logging=enable_logging,
|
251 |
async_lock=async_lock,
|
@@ -254,10 +254,10 @@ def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
254 |
|
255 |
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
|
256 |
"""return unified graph database lock for ensuring atomic operations"""
|
257 |
-
async_lock = _async_locks.get("graph_db_lock") if
|
258 |
return UnifiedLock(
|
259 |
lock=_graph_db_lock,
|
260 |
-
is_async=not
|
261 |
name="graph_db_lock",
|
262 |
enable_logging=enable_logging,
|
263 |
async_lock=async_lock,
|
@@ -266,10 +266,10 @@ def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
266 |
|
267 |
def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
|
268 |
"""return unified data initialization lock for ensuring atomic data initialization"""
|
269 |
-
async_lock = _async_locks.get("data_init_lock") if
|
270 |
return UnifiedLock(
|
271 |
lock=_data_init_lock,
|
272 |
-
is_async=not
|
273 |
name="data_init_lock",
|
274 |
enable_logging=enable_logging,
|
275 |
async_lock=async_lock,
|
@@ -297,7 +297,7 @@ def initialize_share_data(workers: int = 1):
|
|
297 |
global \
|
298 |
_manager, \
|
299 |
_workers, \
|
300 |
-
|
301 |
_storage_lock, \
|
302 |
_internal_lock, \
|
303 |
_pipeline_status_lock, \
|
@@ -312,14 +312,14 @@ def initialize_share_data(workers: int = 1):
|
|
312 |
# Check if already initialized
|
313 |
if _initialized:
|
314 |
direct_log(
|
315 |
-
f"Process {os.getpid()} Shared-Data already initialized (multiprocess={
|
316 |
)
|
317 |
return
|
318 |
|
319 |
_workers = workers
|
320 |
|
321 |
if workers > 1:
|
322 |
-
|
323 |
_manager = Manager()
|
324 |
_internal_lock = _manager.Lock()
|
325 |
_storage_lock = _manager.Lock()
|
@@ -343,7 +343,7 @@ def initialize_share_data(workers: int = 1):
|
|
343 |
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
|
344 |
)
|
345 |
else:
|
346 |
-
|
347 |
_internal_lock = asyncio.Lock()
|
348 |
_storage_lock = asyncio.Lock()
|
349 |
_pipeline_status_lock = asyncio.Lock()
|
@@ -372,7 +372,7 @@ async def initialize_pipeline_status():
|
|
372 |
return
|
373 |
|
374 |
# Create a shared list object for history_messages
|
375 |
-
history_messages = _manager.list() if
|
376 |
pipeline_namespace.update(
|
377 |
{
|
378 |
"autoscanned": False, # Auto-scan started
|
@@ -401,7 +401,7 @@ async def get_update_flag(namespace: str):
|
|
401 |
|
402 |
async with get_internal_lock():
|
403 |
if namespace not in _update_flags:
|
404 |
-
if
|
405 |
_update_flags[namespace] = _manager.list()
|
406 |
else:
|
407 |
_update_flags[namespace] = []
|
@@ -409,7 +409,7 @@ async def get_update_flag(namespace: str):
|
|
409 |
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]"
|
410 |
)
|
411 |
|
412 |
-
if
|
413 |
new_update_flag = _manager.Value("b", False)
|
414 |
else:
|
415 |
# Create a simple mutable object to store boolean value for compatibility with mutiprocess
|
@@ -434,11 +434,7 @@ async def set_all_update_flags(namespace: str):
|
|
434 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
435 |
# Update flags for both modes
|
436 |
for i in range(len(_update_flags[namespace])):
|
437 |
-
|
438 |
-
_update_flags[namespace][i].value = True
|
439 |
-
else:
|
440 |
-
# Use .value attribute instead of direct assignment
|
441 |
-
_update_flags[namespace][i].value = True
|
442 |
|
443 |
|
444 |
async def clear_all_update_flags(namespace: str):
|
@@ -452,11 +448,7 @@ async def clear_all_update_flags(namespace: str):
|
|
452 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
453 |
# Update flags for both modes
|
454 |
for i in range(len(_update_flags[namespace])):
|
455 |
-
|
456 |
-
_update_flags[namespace][i].value = False
|
457 |
-
else:
|
458 |
-
# Use .value attribute instead of direct assignment
|
459 |
-
_update_flags[namespace][i].value = False
|
460 |
|
461 |
|
462 |
async def get_all_update_flags_status() -> Dict[str, list]:
|
@@ -474,7 +466,7 @@ async def get_all_update_flags_status() -> Dict[str, list]:
|
|
474 |
for namespace, flags in _update_flags.items():
|
475 |
worker_statuses = []
|
476 |
for flag in flags:
|
477 |
-
if
|
478 |
worker_statuses.append(flag.value)
|
479 |
else:
|
480 |
worker_statuses.append(flag)
|
@@ -518,7 +510,7 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]:
|
|
518 |
|
519 |
async with get_internal_lock():
|
520 |
if namespace not in _shared_dicts:
|
521 |
-
if
|
522 |
_shared_dicts[namespace] = _manager.dict()
|
523 |
else:
|
524 |
_shared_dicts[namespace] = {}
|
@@ -538,7 +530,7 @@ def finalize_share_data():
|
|
538 |
"""
|
539 |
global \
|
540 |
_manager, \
|
541 |
-
|
542 |
_storage_lock, \
|
543 |
_internal_lock, \
|
544 |
_pipeline_status_lock, \
|
@@ -558,11 +550,11 @@ def finalize_share_data():
|
|
558 |
return
|
559 |
|
560 |
direct_log(
|
561 |
-
f"Process {os.getpid()} finalizing storage data (multiprocess={
|
562 |
)
|
563 |
|
564 |
# In multi-process mode, shut down the Manager
|
565 |
-
if
|
566 |
try:
|
567 |
# Clear shared resources before shutting down Manager
|
568 |
if _shared_dicts is not None:
|
@@ -604,7 +596,7 @@ def finalize_share_data():
|
|
604 |
# Reset global variables
|
605 |
_manager = None
|
606 |
_initialized = None
|
607 |
-
|
608 |
_shared_dicts = None
|
609 |
_init_flags = None
|
610 |
_storage_lock = None
|
|
|
24 |
T = TypeVar("T")
|
25 |
LockType = Union[ProcessLock, asyncio.Lock]
|
26 |
|
27 |
+
_is_multiprocess = None
|
28 |
_workers = None
|
29 |
_manager = None
|
30 |
_initialized = None
|
|
|
218 |
|
219 |
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
|
220 |
"""return unified storage lock for data consistency"""
|
221 |
+
async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None
|
222 |
return UnifiedLock(
|
223 |
lock=_internal_lock,
|
224 |
+
is_async=not _is_multiprocess,
|
225 |
name="internal_lock",
|
226 |
enable_logging=enable_logging,
|
227 |
async_lock=async_lock,
|
|
|
230 |
|
231 |
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
|
232 |
"""return unified storage lock for data consistency"""
|
233 |
+
async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None
|
234 |
return UnifiedLock(
|
235 |
lock=_storage_lock,
|
236 |
+
is_async=not _is_multiprocess,
|
237 |
name="storage_lock",
|
238 |
enable_logging=enable_logging,
|
239 |
async_lock=async_lock,
|
|
|
242 |
|
243 |
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
|
244 |
"""return unified storage lock for data consistency"""
|
245 |
+
async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None
|
246 |
return UnifiedLock(
|
247 |
lock=_pipeline_status_lock,
|
248 |
+
is_async=not _is_multiprocess,
|
249 |
name="pipeline_status_lock",
|
250 |
enable_logging=enable_logging,
|
251 |
async_lock=async_lock,
|
|
|
254 |
|
255 |
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
|
256 |
"""return unified graph database lock for ensuring atomic operations"""
|
257 |
+
async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None
|
258 |
return UnifiedLock(
|
259 |
lock=_graph_db_lock,
|
260 |
+
is_async=not _is_multiprocess,
|
261 |
name="graph_db_lock",
|
262 |
enable_logging=enable_logging,
|
263 |
async_lock=async_lock,
|
|
|
266 |
|
267 |
def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
|
268 |
"""return unified data initialization lock for ensuring atomic data initialization"""
|
269 |
+
async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None
|
270 |
return UnifiedLock(
|
271 |
lock=_data_init_lock,
|
272 |
+
is_async=not _is_multiprocess,
|
273 |
name="data_init_lock",
|
274 |
enable_logging=enable_logging,
|
275 |
async_lock=async_lock,
|
|
|
297 |
global \
|
298 |
_manager, \
|
299 |
_workers, \
|
300 |
+
_is_multiprocess, \
|
301 |
_storage_lock, \
|
302 |
_internal_lock, \
|
303 |
_pipeline_status_lock, \
|
|
|
312 |
# Check if already initialized
|
313 |
if _initialized:
|
314 |
direct_log(
|
315 |
+
f"Process {os.getpid()} Shared-Data already initialized (multiprocess={_is_multiprocess})"
|
316 |
)
|
317 |
return
|
318 |
|
319 |
_workers = workers
|
320 |
|
321 |
if workers > 1:
|
322 |
+
_is_multiprocess = True
|
323 |
_manager = Manager()
|
324 |
_internal_lock = _manager.Lock()
|
325 |
_storage_lock = _manager.Lock()
|
|
|
343 |
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
|
344 |
)
|
345 |
else:
|
346 |
+
_is_multiprocess = False
|
347 |
_internal_lock = asyncio.Lock()
|
348 |
_storage_lock = asyncio.Lock()
|
349 |
_pipeline_status_lock = asyncio.Lock()
|
|
|
372 |
return
|
373 |
|
374 |
# Create a shared list object for history_messages
|
375 |
+
history_messages = _manager.list() if _is_multiprocess else []
|
376 |
pipeline_namespace.update(
|
377 |
{
|
378 |
"autoscanned": False, # Auto-scan started
|
|
|
401 |
|
402 |
async with get_internal_lock():
|
403 |
if namespace not in _update_flags:
|
404 |
+
if _is_multiprocess and _manager is not None:
|
405 |
_update_flags[namespace] = _manager.list()
|
406 |
else:
|
407 |
_update_flags[namespace] = []
|
|
|
409 |
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]"
|
410 |
)
|
411 |
|
412 |
+
if _is_multiprocess and _manager is not None:
|
413 |
new_update_flag = _manager.Value("b", False)
|
414 |
else:
|
415 |
# Create a simple mutable object to store boolean value for compatibility with mutiprocess
|
|
|
434 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
435 |
# Update flags for both modes
|
436 |
for i in range(len(_update_flags[namespace])):
|
437 |
+
_update_flags[namespace][i].value = True
|
|
|
|
|
|
|
|
|
438 |
|
439 |
|
440 |
async def clear_all_update_flags(namespace: str):
|
|
|
448 |
raise ValueError(f"Namespace {namespace} not found in update flags")
|
449 |
# Update flags for both modes
|
450 |
for i in range(len(_update_flags[namespace])):
|
451 |
+
_update_flags[namespace][i].value = False
|
|
|
|
|
|
|
|
|
452 |
|
453 |
|
454 |
async def get_all_update_flags_status() -> Dict[str, list]:
|
|
|
466 |
for namespace, flags in _update_flags.items():
|
467 |
worker_statuses = []
|
468 |
for flag in flags:
|
469 |
+
if _is_multiprocess:
|
470 |
worker_statuses.append(flag.value)
|
471 |
else:
|
472 |
worker_statuses.append(flag)
|
|
|
510 |
|
511 |
async with get_internal_lock():
|
512 |
if namespace not in _shared_dicts:
|
513 |
+
if _is_multiprocess and _manager is not None:
|
514 |
_shared_dicts[namespace] = _manager.dict()
|
515 |
else:
|
516 |
_shared_dicts[namespace] = {}
|
|
|
530 |
"""
|
531 |
global \
|
532 |
_manager, \
|
533 |
+
_is_multiprocess, \
|
534 |
_storage_lock, \
|
535 |
_internal_lock, \
|
536 |
_pipeline_status_lock, \
|
|
|
550 |
return
|
551 |
|
552 |
direct_log(
|
553 |
+
f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
|
554 |
)
|
555 |
|
556 |
# In multi-process mode, shut down the Manager
|
557 |
+
if _is_multiprocess and _manager is not None:
|
558 |
try:
|
559 |
# Clear shared resources before shutting down Manager
|
560 |
if _shared_dicts is not None:
|
|
|
596 |
# Reset global variables
|
597 |
_manager = None
|
598 |
_initialized = None
|
599 |
+
_is_multiprocess = None
|
600 |
_shared_dicts = None
|
601 |
_init_flags = None
|
602 |
_storage_lock = None
|