yangdx commited on
Commit
76d72c8
·
1 Parent(s): 556f361

Refactor shared storage to support both single and multi-process modes

Browse files

• Initialize storage based on worker count
• Remove redundant global variable checks
• Add explicit mutex initialization
• Centralize shared storage initialization
• Fix process/thread lock selection logic

lightrag/api/lightrag_server.py CHANGED
@@ -96,10 +96,6 @@ def create_app(args):
96
  logger.setLevel(getattr(logging, args.log_level))
97
  set_verbose_debug(args.verbose)
98
 
99
- from lightrag.kg.shared_storage import is_multiprocess
100
-
101
- logger.info(f"==== Multi-processor mode: {is_multiprocess} ====")
102
-
103
  # Verify that bindings are correctly setup
104
  if args.llm_binding not in [
105
  "lollms",
@@ -422,11 +418,6 @@ def get_application():
422
 
423
  args = types.SimpleNamespace(**json.loads(args_json))
424
 
425
- if args.workers > 1:
426
- from lightrag.kg.shared_storage import initialize_share_data
427
-
428
- initialize_share_data()
429
-
430
  return create_app(args)
431
 
432
 
@@ -492,6 +483,9 @@ def main():
492
 
493
  display_splash_screen(args)
494
 
 
 
 
495
  uvicorn_config = {
496
  "app": "lightrag.api.lightrag_server:get_application",
497
  "factory": True,
 
96
  logger.setLevel(getattr(logging, args.log_level))
97
  set_verbose_debug(args.verbose)
98
 
 
 
 
 
99
  # Verify that bindings are correctly setup
100
  if args.llm_binding not in [
101
  "lollms",
 
418
 
419
  args = types.SimpleNamespace(**json.loads(args_json))
420
 
 
 
 
 
 
421
  return create_app(args)
422
 
423
 
 
483
 
484
  display_splash_screen(args)
485
 
486
+ from lightrag.kg.shared_storage import initialize_share_data
487
+ initialize_share_data(args.workers)
488
+
489
  uvicorn_config = {
490
  "app": "lightrag.api.lightrag_server:get_application",
491
  "factory": True,
lightrag/kg/shared_storage.py CHANGED
@@ -7,35 +7,50 @@ from lightrag.utils import logger
7
 
8
  LockType = Union[ProcessLock, ThreadLock]
9
 
10
- is_multiprocess = False
11
-
12
  _manager = None
13
- _global_lock: Optional[LockType] = None
 
 
14
 
15
  # shared data for storage across processes
16
- _shared_dicts: Optional[Dict[str, Any]] = {}
17
- _share_objects: Optional[Dict[str, Any]] = {}
18
  _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
19
 
 
20
 
21
- def initialize_share_data():
22
- """Initialize shared data, only called if multiple processes where workers > 1"""
23
- global _manager, _shared_dicts, _share_objects, _init_flags, is_multiprocess
24
- is_multiprocess = True
25
-
26
- logger.info(f"Process {os.getpid()} initializing shared storage")
27
-
28
- # Initialize manager
29
- if _manager is None:
30
- _manager = Manager()
31
- logger.info(f"Process {os.getpid()} created manager")
32
 
33
- # Create shared dictionaries with manager
34
- _shared_dicts = _manager.dict()
35
- _share_objects = _manager.dict()
36
- _init_flags = _manager.dict() # 使用共享字典存储初始化标志
37
- logger.info(f"Process {os.getpid()} created shared dictionaries")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
 
 
39
 
40
  def try_initialize_namespace(namespace: str) -> bool:
41
  """
@@ -44,7 +59,7 @@ def try_initialize_namespace(namespace: str) -> bool:
44
  """
45
  global _init_flags, _manager
46
 
47
- if is_multiprocess:
48
  if _init_flags is None:
49
  raise RuntimeError(
50
  "Shared storage not initialized. Call initialize_share_data() first."
@@ -55,17 +70,13 @@ def try_initialize_namespace(namespace: str) -> bool:
55
 
56
  logger.info(f"Process {os.getpid()} trying to initialize namespace {namespace}")
57
 
58
- # 使用全局锁保护共享字典的访问
59
- with _get_global_lock():
60
- # 检查是否已经初始化
61
  if namespace not in _init_flags:
62
- # 设置初始化标志
63
  _init_flags[namespace] = True
64
  logger.info(
65
  f"Process {os.getpid()} ready to initialize namespace {namespace}"
66
  )
67
  return True
68
-
69
  logger.info(
70
  f"Process {os.getpid()} found namespace {namespace} already initialized"
71
  )
@@ -73,14 +84,6 @@ def try_initialize_namespace(namespace: str) -> bool:
73
 
74
 
75
  def _get_global_lock() -> LockType:
76
- global _global_lock, is_multiprocess, _manager
77
-
78
- if _global_lock is None:
79
- if is_multiprocess:
80
- _global_lock = _manager.Lock() # Use manager for lock
81
- else:
82
- _global_lock = ThreadLock()
83
-
84
  return _global_lock
85
 
86
 
@@ -96,36 +99,20 @@ def get_scan_lock() -> LockType:
96
 
97
  def get_namespace_object(namespace: str) -> Any:
98
  """Get an object for specific namespace"""
99
- global _share_objects, is_multiprocess, _manager
100
-
101
- if is_multiprocess and not _manager:
102
- raise RuntimeError(
103
- "Multiprocess mode detected but shared storage not initialized. Call initialize_share_data() first."
104
- )
105
 
106
  if namespace not in _share_objects:
107
  lock = _get_global_lock()
108
  with lock:
109
  if namespace not in _share_objects:
110
- if is_multiprocess:
111
  _share_objects[namespace] = _manager.Value("O", None)
112
  else:
113
  _share_objects[namespace] = None
114
 
115
  return _share_objects[namespace]
116
 
117
-
118
- # 移除不再使用的函数
119
-
120
-
121
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
122
  """get storage space for specific storage type(namespace)"""
123
- global _shared_dicts, is_multiprocess, _manager
124
-
125
- if is_multiprocess and not _manager:
126
- raise RuntimeError(
127
- "Multiprocess mode detected but shared storage not initialized. Call initialize_share_data() first."
128
- )
129
 
130
  if namespace not in _shared_dicts:
131
  lock = _get_global_lock()
 
7
 
8
  LockType = Union[ProcessLock, ThreadLock]
9
 
 
 
10
  _manager = None
11
+ _initialized = None
12
+ _is_multiprocess = None
13
+ is_multiprocess = None
14
 
15
  # shared data for storage across processes
16
+ _shared_dicts: Optional[Dict[str, Any]] = None
17
+ _share_objects: Optional[Dict[str, Any]] = None
18
  _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
19
 
20
+ _global_lock: Optional[LockType] = None
21
 
 
 
 
 
 
 
 
 
 
 
 
22
 
23
+ def initialize_share_data(workers: int = 1):
24
+ """Initialize storage data"""
25
+ global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
26
+
27
+ if _initialized and _initialized.value:
28
+ is_multiprocess = _is_multiprocess.value
29
+ if _is_multiprocess.value:
30
+ logger.info(f"Process {os.getpid()} storage data already initialized!")
31
+ return
32
+
33
+ _manager = Manager()
34
+ _initialized = _manager.Value("b", False)
35
+ _is_multiprocess = _manager.Value("b", False)
36
+
37
+ if workers == 1:
38
+ _is_multiprocess.value = False
39
+ _global_lock = ThreadLock()
40
+ _shared_dicts = {}
41
+ _share_objects = {}
42
+ _init_flags = {}
43
+ logger.info(f"Process {os.getpid()} storage data created for Single Process")
44
+ else:
45
+ _is_multiprocess.value = True
46
+ _global_lock = _manager.Lock()
47
+ # Create shared dictionaries with manager
48
+ _shared_dicts = _manager.dict()
49
+ _share_objects = _manager.dict()
50
+ _init_flags = _manager.dict() # 使用共享字典存储初始化标志
51
+ logger.info(f"Process {os.getpid()} storage data created for Multiple Process")
52
 
53
+ is_multiprocess = _is_multiprocess.value
54
 
55
  def try_initialize_namespace(namespace: str) -> bool:
56
  """
 
59
  """
60
  global _init_flags, _manager
61
 
62
+ if _is_multiprocess.value:
63
  if _init_flags is None:
64
  raise RuntimeError(
65
  "Shared storage not initialized. Call initialize_share_data() first."
 
70
 
71
  logger.info(f"Process {os.getpid()} trying to initialize namespace {namespace}")
72
 
73
+ with _global_lock:
 
 
74
  if namespace not in _init_flags:
 
75
  _init_flags[namespace] = True
76
  logger.info(
77
  f"Process {os.getpid()} ready to initialize namespace {namespace}"
78
  )
79
  return True
 
80
  logger.info(
81
  f"Process {os.getpid()} found namespace {namespace} already initialized"
82
  )
 
84
 
85
 
86
  def _get_global_lock() -> LockType:
 
 
 
 
 
 
 
 
87
  return _global_lock
88
 
89
 
 
99
 
100
  def get_namespace_object(namespace: str) -> Any:
101
  """Get an object for specific namespace"""
 
 
 
 
 
 
102
 
103
  if namespace not in _share_objects:
104
  lock = _get_global_lock()
105
  with lock:
106
  if namespace not in _share_objects:
107
+ if _is_multiprocess.value:
108
  _share_objects[namespace] = _manager.Value("O", None)
109
  else:
110
  _share_objects[namespace] = None
111
 
112
  return _share_objects[namespace]
113
 
 
 
 
 
114
  def get_namespace_data(namespace: str) -> Dict[str, Any]:
115
  """get storage space for specific storage type(namespace)"""
 
 
 
 
 
 
116
 
117
  if namespace not in _shared_dicts:
118
  lock = _get_global_lock()
lightrag/lightrag.py CHANGED
@@ -267,6 +267,9 @@ class LightRAG:
267
  _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED)
268
 
269
  def __post_init__(self):
 
 
 
270
  os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)
271
  set_logger(self.log_file_path, self.log_level)
272
  logger.info(f"Logger initialized for working directory: {self.working_dir}")
 
267
  _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED)
268
 
269
  def __post_init__(self):
270
+ from lightrag.kg.shared_storage import initialize_share_data
271
+ initialize_share_data()
272
+
273
  os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)
274
  set_logger(self.log_file_path, self.log_level)
275
  logger.info(f"Logger initialized for working directory: {self.working_dir}")