yangdx
commited on
Commit
·
2f9e5fb
1
Parent(s):
a2c0b5a
Refactor storage initialization to avoid redundant intitial data loads across processes, show init logs to first load only
Browse files
lightrag/kg/faiss_impl.py
CHANGED
@@ -57,16 +57,16 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
|
57 |
# If you have a large number of vectors, you might want IVF or other indexes.
|
58 |
# For demonstration, we use a simple IndexFlatIP.
|
59 |
self._index.value = faiss.IndexFlatIP(self._dim)
|
|
|
|
|
|
|
|
|
|
|
60 |
else:
|
61 |
if self._index is None:
|
62 |
self._index = faiss.IndexFlatIP(self._dim)
|
63 |
-
|
64 |
-
|
65 |
-
# Maps <int faiss_id> → metadata (including your original ID).
|
66 |
-
self._id_to_meta.update({})
|
67 |
-
|
68 |
-
# Attempt to load an existing index + metadata from disk
|
69 |
-
self._load_faiss_index()
|
70 |
|
71 |
|
72 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
|
|
57 |
# If you have a large number of vectors, you might want IVF or other indexes.
|
58 |
# For demonstration, we use a simple IndexFlatIP.
|
59 |
self._index.value = faiss.IndexFlatIP(self._dim)
|
60 |
+
# Keep a local store for metadata, IDs, etc.
|
61 |
+
# Maps <int faiss_id> → metadata (including your original ID).
|
62 |
+
self._id_to_meta.update({})
|
63 |
+
# Attempt to load an existing index + metadata from disk
|
64 |
+
self._load_faiss_index()
|
65 |
else:
|
66 |
if self._index is None:
|
67 |
self._index = faiss.IndexFlatIP(self._dim)
|
68 |
+
self._id_to_meta.update({})
|
69 |
+
self._load_faiss_index()
|
|
|
|
|
|
|
|
|
|
|
70 |
|
71 |
|
72 |
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
lightrag/kg/json_doc_status_impl.py
CHANGED
@@ -26,8 +26,9 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|
26 |
self._storage_lock = get_storage_lock()
|
27 |
self._data = get_namespace_data(self.namespace)
|
28 |
with self._storage_lock:
|
29 |
-
self._data
|
30 |
-
|
|
|
31 |
|
32 |
async def filter_keys(self, keys: set[str]) -> set[str]:
|
33 |
"""Return keys that should be processed (not in storage or not successfully processed)"""
|
|
|
26 |
self._storage_lock = get_storage_lock()
|
27 |
self._data = get_namespace_data(self.namespace)
|
28 |
with self._storage_lock:
|
29 |
+
if not self._data:
|
30 |
+
self._data.update(load_json(self._file_name) or {})
|
31 |
+
logger.info(f"Loaded document status storage with {len(self._data)} records")
|
32 |
|
33 |
async def filter_keys(self, keys: set[str]) -> set[str]:
|
34 |
"""Return keys that should be processed (not in storage or not successfully processed)"""
|
lightrag/kg/json_kv_impl.py
CHANGED
@@ -18,13 +18,13 @@ from .shared_storage import get_namespace_data, get_storage_lock
|
|
18 |
class JsonKVStorage(BaseKVStorage):
|
19 |
def __post_init__(self):
|
20 |
working_dir = self.global_config["working_dir"]
|
|
|
21 |
self._storage_lock = get_storage_lock()
|
22 |
self._data = get_namespace_data(self.namespace)
|
23 |
with self._storage_lock:
|
24 |
if not self._data:
|
25 |
-
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
|
26 |
self._data: dict[str, Any] = load_json(self._file_name) or {}
|
27 |
-
|
28 |
|
29 |
|
30 |
async def index_done_callback(self) -> None:
|
|
|
18 |
class JsonKVStorage(BaseKVStorage):
|
19 |
def __post_init__(self):
|
20 |
working_dir = self.global_config["working_dir"]
|
21 |
+
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
|
22 |
self._storage_lock = get_storage_lock()
|
23 |
self._data = get_namespace_data(self.namespace)
|
24 |
with self._storage_lock:
|
25 |
if not self._data:
|
|
|
26 |
self._data: dict[str, Any] = load_json(self._file_name) or {}
|
27 |
+
logger.info(f"Load KV {self.namespace} with {len(self._data)} data")
|
28 |
|
29 |
|
30 |
async def index_done_callback(self) -> None:
|
lightrag/kg/nano_vector_db_impl.py
CHANGED
@@ -47,14 +47,14 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|
47 |
if self._client.value is None:
|
48 |
self._client.value = NanoVectorDB(
|
49 |
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
50 |
-
|
|
|
51 |
else:
|
52 |
if self._client is None:
|
53 |
self._client = NanoVectorDB(
|
54 |
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
55 |
)
|
56 |
-
|
57 |
-
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
58 |
|
59 |
def _get_client(self):
|
60 |
"""Get the appropriate client instance based on multiprocess mode"""
|
|
|
47 |
if self._client.value is None:
|
48 |
self._client.value = NanoVectorDB(
|
49 |
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
50 |
+
)
|
51 |
+
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
52 |
else:
|
53 |
if self._client is None:
|
54 |
self._client = NanoVectorDB(
|
55 |
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
56 |
)
|
57 |
+
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
|
|
58 |
|
59 |
def _get_client(self):
|
60 |
"""Get the appropriate client instance based on multiprocess mode"""
|