yangdx commited on
Commit
e5f9f74
·
1 Parent(s): 7fb023a

Refactor storage initialization to separate object creation from data loading

Browse files

• Split __post_init__ and initialize()
• Move data loading to initialize()
• Add FastAPI lifespan integration

lightrag/api/lightrag_server.py CHANGED
@@ -135,14 +135,16 @@ def create_app(args):
135
  # Initialize database connections
136
  await rag.initialize_storages()
137
 
138
- # Auto scan documents if enabled
139
- if args.auto_scan_at_startup:
140
- # Import necessary functions from shared_storage
141
- from lightrag.kg.shared_storage import (
142
- get_namespace_data,
143
- get_storage_lock,
144
  )
 
145
 
 
 
146
  # Check if a task is already running (with lock protection)
147
  pipeline_status = await get_namespace_data("pipeline_status")
148
  should_start_task = False
 
135
  # Initialize database connections
136
  await rag.initialize_storages()
137
 
138
+ # Import necessary functions from shared_storage
139
+ from lightrag.kg.shared_storage import (
140
+ get_namespace_data,
141
+ get_storage_lock,
142
+ initialize_pipeline_namespace,
 
143
  )
144
+ await initialize_pipeline_namespace()
145
 
146
+ # Auto scan documents if enabled
147
+ if args.auto_scan_at_startup:
148
  # Check if a task is already running (with lock protection)
149
  pipeline_status = await get_namespace_data("pipeline_status")
150
  should_start_task = False
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -24,11 +24,14 @@ from .shared_storage import (
24
  class JsonDocStatusStorage(DocStatusStorage):
25
  """JSON implementation of document status storage"""
26
 
27
- async def __post_init__(self):
28
  working_dir = self.global_config["working_dir"]
29
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
30
  self._storage_lock = get_storage_lock()
 
31
 
 
 
32
  # check need_init must before get_namespace_data
33
  need_init = try_initialize_namespace(self.namespace)
34
  self._data = await get_namespace_data(self.namespace)
 
24
  class JsonDocStatusStorage(DocStatusStorage):
25
  """JSON implementation of document status storage"""
26
 
27
+ def __post_init__(self):
28
  working_dir = self.global_config["working_dir"]
29
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
30
  self._storage_lock = get_storage_lock()
31
+ self._data = None
32
 
33
+ async def initialize(self):
34
+ """Initialize storage data"""
35
  # check need_init must before get_namespace_data
36
  need_init = try_initialize_namespace(self.namespace)
37
  self._data = await get_namespace_data(self.namespace)
lightrag/kg/json_kv_impl.py CHANGED
@@ -20,11 +20,14 @@ from .shared_storage import (
20
  @final
21
  @dataclass
22
  class JsonKVStorage(BaseKVStorage):
23
- async def __post_init__(self):
24
  working_dir = self.global_config["working_dir"]
25
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
26
  self._storage_lock = get_storage_lock()
 
27
 
 
 
28
  # check need_init must before get_namespace_data
29
  need_init = try_initialize_namespace(self.namespace)
30
  self._data = await get_namespace_data(self.namespace)
 
20
  @final
21
  @dataclass
22
  class JsonKVStorage(BaseKVStorage):
23
+ def __post_init__(self):
24
  working_dir = self.global_config["working_dir"]
25
  self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
26
  self._storage_lock = get_storage_lock()
27
+ self._data = None
28
 
29
+ async def initialize(self):
30
+ """Initialize storage data"""
31
  # check need_init must before get_namespace_data
32
  need_init = try_initialize_namespace(self.namespace)
33
  self._data = await get_namespace_data(self.namespace)
lightrag/kg/nano_vector_db_impl.py CHANGED
@@ -16,15 +16,16 @@ if not pm.is_installed("nano-vectordb"):
16
  pm.install("nano-vectordb")
17
 
18
  from nano_vectordb import NanoVectorDB
19
- from threading import Lock as ThreadLock
20
 
21
 
22
  @final
23
  @dataclass
24
  class NanoVectorDBStorage(BaseVectorStorage):
25
  def __post_init__(self):
26
- # Initialize lock only for file operations
27
- self._storage_lock = ThreadLock()
 
28
 
29
  # Use global config value if specified, otherwise use default
30
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
@@ -40,7 +41,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
40
  )
41
  self._max_batch_size = self.global_config["embedding_batch_num"]
42
 
43
- with self._storage_lock:
 
 
44
  self._client = NanoVectorDB(
45
  self.embedding_func.embedding_dim,
46
  storage_file=self._client_file_name,
@@ -163,5 +166,5 @@ class NanoVectorDBStorage(BaseVectorStorage):
163
  logger.error(f"Error deleting relations for {entity_name}: {e}")
164
 
165
  async def index_done_callback(self) -> None:
166
- with self._storage_lock:
167
  self._get_client().save()
 
16
  pm.install("nano-vectordb")
17
 
18
  from nano_vectordb import NanoVectorDB
19
+ from .shared_storage import get_storage_lock
20
 
21
 
22
  @final
23
  @dataclass
24
  class NanoVectorDBStorage(BaseVectorStorage):
25
  def __post_init__(self):
26
+ # Initialize basic attributes
27
+ self._storage_lock = get_storage_lock()
28
+ self._client = None
29
 
30
  # Use global config value if specified, otherwise use default
31
  kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
 
41
  )
42
  self._max_batch_size = self.global_config["embedding_batch_num"]
43
 
44
+ async def initialize(self):
45
+ """Initialize storage data"""
46
+ async with self._storage_lock:
47
  self._client = NanoVectorDB(
48
  self.embedding_func.embedding_dim,
49
  storage_file=self._client_file_name,
 
166
  logger.error(f"Error deleting relations for {entity_name}: {e}")
167
 
168
  async def index_done_callback(self) -> None:
169
+ async with self._storage_lock:
170
  self._get_client().save()
lightrag/kg/shared_storage.py CHANGED
@@ -125,13 +125,21 @@ def initialize_share_data(workers: int = 1):
125
  # Mark as initialized
126
  _initialized = True
127
 
128
- # Initialize pipeline status for document indexing control
129
- pipeline_namespace = get_namespace_data("pipeline_status")
130
 
131
- # Create a shared list object for history_messages
132
- history_messages = _manager.list() if is_multiprocess else []
133
- pipeline_namespace.update(
134
- {
 
 
 
 
 
 
 
 
 
 
135
  "busy": False, # Control concurrent processes
136
  "job_name": "Default Job", # Current job name (indexing files/indexing texts)
137
  "job_start": None, # Job start time
@@ -141,8 +149,8 @@ def initialize_share_data(workers: int = 1):
141
  "request_pending": False, # Flag for pending request for processing
142
  "latest_message": "", # Latest message from pipeline processing
143
  "history_messages": history_messages, # 使用共享列表对象
144
- }
145
- )
146
 
147
 
148
  async def get_update_flags(namespace: str):
 
125
  # Mark as initialized
126
  _initialized = True
127
 
 
 
128
 
129
+ async def initialize_pipeline_namespace():
130
+ """
131
+ Initialize pipeline namespace with default values.
132
+ """
133
+ pipeline_namespace = await get_namespace_data("pipeline_status")
134
+
135
+ async with get_storage_lock():
136
+ # Check if already initialized by checking for required fields
137
+ if "busy" in pipeline_namespace:
138
+ return
139
+
140
+ # Create a shared list object for history_messages
141
+ history_messages = _manager.list() if is_multiprocess else []
142
+ pipeline_namespace.update({
143
  "busy": False, # Control concurrent processes
144
  "job_name": "Default Job", # Current job name (indexing files/indexing texts)
145
  "job_start": None, # Job start time
 
149
  "request_pending": False, # Flag for pending request for processing
150
  "latest_message": "", # Latest message from pipeline processing
151
  "history_messages": history_messages, # 使用共享列表对象
152
+ })
153
+ direct_log(f"Process {os.getpid()} Pipeline namespace initialized")
154
 
155
 
156
  async def get_update_flags(namespace: str):