yangdx commited on
Commit
4e02c34
·
1 Parent(s): 08e5593

Fix linting

Browse files
lightrag/api/routers/document_routes.py CHANGED
@@ -60,7 +60,9 @@ class InsertResponse(BaseModel):
60
 
61
 
62
  class ClearDocumentsResponse(BaseModel):
63
- status: str = Field(description="Status of the clear operation: success/partial_success/busy/fail")
 
 
64
  message: str = Field(description="Message describing the operation result")
65
 
66
 
@@ -448,7 +450,7 @@ async def pipeline_index_texts(rag: LightRAG, texts: List[str]):
448
  await rag.apipeline_process_enqueue_documents()
449
 
450
 
451
- # TODO: deprecate after /insert_file is removed
452
  async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
453
  """Save the uploaded file to a temporary location
454
 
@@ -783,7 +785,10 @@ def create_document_routes(
783
  HTTPException: Raised when a serious error occurs during the clearing process,
784
  with status code 500 and error details in the detail field.
785
  """
786
- from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
 
 
 
787
 
788
  # Get pipeline status and lock
789
  pipeline_status = await get_namespace_data("pipeline_status")
@@ -794,14 +799,16 @@ def create_document_routes(
794
  if pipeline_status.get("busy", False):
795
  return ClearDocumentsResponse(
796
  status="busy",
797
- message="Cannot clear documents while pipeline is busy"
798
  )
799
  # Set busy to true
800
  pipeline_status["busy"] = True
801
  pipeline_status["job_name"] = "Clearing Documents"
802
  pipeline_status["latest_message"] = "Starting document clearing process"
803
  if "history_messages" in pipeline_status:
804
- pipeline_status["history_messages"].append("Starting document clearing process")
 
 
805
 
806
  try:
807
  # Use drop method to clear all data
@@ -813,25 +820,27 @@ def create_document_routes(
813
  rag.relationships_vdb,
814
  rag.chunks_vdb,
815
  rag.chunk_entity_relation_graph,
816
- rag.doc_status
817
  ]
818
-
819
  # Log storage drop start
820
  if "history_messages" in pipeline_status:
821
- pipeline_status["history_messages"].append("Starting to drop storage components")
822
-
 
 
823
  for storage in storages:
824
  if storage is not None:
825
  drop_tasks.append(storage.drop())
826
-
827
  # Wait for all drop tasks to complete
828
  drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
829
-
830
  # Check for errors and log results
831
  errors = []
832
  storage_success_count = 0
833
  storage_error_count = 0
834
-
835
  for i, result in enumerate(drop_results):
836
  storage_name = storages[i].__class__.__name__
837
  if isinstance(result, Exception):
@@ -842,7 +851,7 @@ def create_document_routes(
842
  else:
843
  logger.info(f"Successfully dropped {storage_name}")
844
  storage_success_count += 1
845
-
846
  # Log storage drop results
847
  if "history_messages" in pipeline_status:
848
  if storage_error_count > 0:
@@ -853,26 +862,25 @@ def create_document_routes(
853
  pipeline_status["history_messages"].append(
854
  f"Successfully dropped all {storage_success_count} storage components"
855
  )
856
-
857
  # If all storage operations failed, return error status and don't proceed with file deletion
858
  if storage_success_count == 0 and storage_error_count > 0:
859
  error_message = "All storage drop operations failed. Aborting document clearing process."
860
  logger.error(error_message)
861
  if "history_messages" in pipeline_status:
862
  pipeline_status["history_messages"].append(error_message)
863
- return ClearDocumentsResponse(
864
- status="fail",
865
- message=error_message
866
- )
867
-
868
  # Log file deletion start
869
  if "history_messages" in pipeline_status:
870
- pipeline_status["history_messages"].append("Starting to delete files in input directory")
871
-
 
 
872
  # Delete all files in input_dir
873
  deleted_files_count = 0
874
  file_errors_count = 0
875
-
876
  for file_path in doc_manager.input_dir.glob("**/*"):
877
  if file_path.is_file():
878
  try:
@@ -881,7 +889,7 @@ def create_document_routes(
881
  except Exception as e:
882
  logger.error(f"Error deleting file {file_path}: {str(e)}")
883
  file_errors_count += 1
884
-
885
  # Log file deletion results
886
  if "history_messages" in pipeline_status:
887
  if file_errors_count > 0:
@@ -893,7 +901,7 @@ def create_document_routes(
893
  pipeline_status["history_messages"].append(
894
  f"Successfully deleted {deleted_files_count} files"
895
  )
896
-
897
  # Prepare final result message
898
  final_message = ""
899
  if errors:
@@ -903,16 +911,12 @@ def create_document_routes(
903
  final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
904
  status = "success"
905
 
906
-
907
  # Log final result
908
  if "history_messages" in pipeline_status:
909
  pipeline_status["history_messages"].append(final_message)
910
-
911
  # Return response based on results
912
- return ClearDocumentsResponse(
913
- status=status,
914
- message=final_message
915
- )
916
  except Exception as e:
917
  error_msg = f"Error clearing documents: {str(e)}"
918
  logger.error(error_msg)
 
60
 
61
 
62
  class ClearDocumentsResponse(BaseModel):
63
+ status: str = Field(
64
+ description="Status of the clear operation: success/partial_success/busy/fail"
65
+ )
66
  message: str = Field(description="Message describing the operation result")
67
 
68
 
 
450
  await rag.apipeline_process_enqueue_documents()
451
 
452
 
453
+ # TODO: deprecate after /insert_file is removed
454
  async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
455
  """Save the uploaded file to a temporary location
456
 
 
785
  HTTPException: Raised when a serious error occurs during the clearing process,
786
  with status code 500 and error details in the detail field.
787
  """
788
+ from lightrag.kg.shared_storage import (
789
+ get_namespace_data,
790
+ get_pipeline_status_lock,
791
+ )
792
 
793
  # Get pipeline status and lock
794
  pipeline_status = await get_namespace_data("pipeline_status")
 
799
  if pipeline_status.get("busy", False):
800
  return ClearDocumentsResponse(
801
  status="busy",
802
+ message="Cannot clear documents while pipeline is busy",
803
  )
804
  # Set busy to true
805
  pipeline_status["busy"] = True
806
  pipeline_status["job_name"] = "Clearing Documents"
807
  pipeline_status["latest_message"] = "Starting document clearing process"
808
  if "history_messages" in pipeline_status:
809
+ pipeline_status["history_messages"].append(
810
+ "Starting document clearing process"
811
+ )
812
 
813
  try:
814
  # Use drop method to clear all data
 
820
  rag.relationships_vdb,
821
  rag.chunks_vdb,
822
  rag.chunk_entity_relation_graph,
823
+ rag.doc_status,
824
  ]
825
+
826
  # Log storage drop start
827
  if "history_messages" in pipeline_status:
828
+ pipeline_status["history_messages"].append(
829
+ "Starting to drop storage components"
830
+ )
831
+
832
  for storage in storages:
833
  if storage is not None:
834
  drop_tasks.append(storage.drop())
835
+
836
  # Wait for all drop tasks to complete
837
  drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
838
+
839
  # Check for errors and log results
840
  errors = []
841
  storage_success_count = 0
842
  storage_error_count = 0
843
+
844
  for i, result in enumerate(drop_results):
845
  storage_name = storages[i].__class__.__name__
846
  if isinstance(result, Exception):
 
851
  else:
852
  logger.info(f"Successfully dropped {storage_name}")
853
  storage_success_count += 1
854
+
855
  # Log storage drop results
856
  if "history_messages" in pipeline_status:
857
  if storage_error_count > 0:
 
862
  pipeline_status["history_messages"].append(
863
  f"Successfully dropped all {storage_success_count} storage components"
864
  )
865
+
866
  # If all storage operations failed, return error status and don't proceed with file deletion
867
  if storage_success_count == 0 and storage_error_count > 0:
868
  error_message = "All storage drop operations failed. Aborting document clearing process."
869
  logger.error(error_message)
870
  if "history_messages" in pipeline_status:
871
  pipeline_status["history_messages"].append(error_message)
872
+ return ClearDocumentsResponse(status="fail", message=error_message)
873
+
 
 
 
874
  # Log file deletion start
875
  if "history_messages" in pipeline_status:
876
+ pipeline_status["history_messages"].append(
877
+ "Starting to delete files in input directory"
878
+ )
879
+
880
  # Delete all files in input_dir
881
  deleted_files_count = 0
882
  file_errors_count = 0
883
+
884
  for file_path in doc_manager.input_dir.glob("**/*"):
885
  if file_path.is_file():
886
  try:
 
889
  except Exception as e:
890
  logger.error(f"Error deleting file {file_path}: {str(e)}")
891
  file_errors_count += 1
892
+
893
  # Log file deletion results
894
  if "history_messages" in pipeline_status:
895
  if file_errors_count > 0:
 
901
  pipeline_status["history_messages"].append(
902
  f"Successfully deleted {deleted_files_count} files"
903
  )
904
+
905
  # Prepare final result message
906
  final_message = ""
907
  if errors:
 
911
  final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
912
  status = "success"
913
 
 
914
  # Log final result
915
  if "history_messages" in pipeline_status:
916
  pipeline_status["history_messages"].append(final_message)
917
+
918
  # Return response based on results
919
+ return ClearDocumentsResponse(status=status, message=final_message)
 
 
 
920
  except Exception as e:
921
  error_msg = f"Error clearing documents: {str(e)}"
922
  logger.error(error_msg)
lightrag/base.py CHANGED
@@ -111,11 +111,11 @@ class StorageNameSpace(ABC):
111
  @abstractmethod
112
  async def index_done_callback(self) -> None:
113
  """Commit the storage operations after indexing"""
114
-
115
  @abstractmethod
116
  async def drop(self) -> dict[str, str]:
117
  """Drop all data from storage and clean up resources
118
-
119
  This abstract method defines the contract for dropping all data from a storage implementation.
120
  Each storage type must implement this method to:
121
  1. Clear all data from memory and/or external storage
@@ -124,14 +124,14 @@ class StorageNameSpace(ABC):
124
  4. Handle cleanup of any resources
125
  5. Notify other processes if necessary
126
  6. This action should persistent the data to disk immediately.
127
-
128
  Returns:
129
  dict[str, str]: Operation status and message with the following format:
130
  {
131
  "status": str, # "success" or "error"
132
  "message": str # "data dropped" on success, error details on failure
133
  }
134
-
135
  Implementation specific:
136
  - On success: return {"status": "success", "message": "data dropped"}
137
  - On failure: return {"status": "error", "message": "<error details>"}
@@ -238,42 +238,43 @@ class BaseKVStorage(StorageNameSpace, ABC):
238
  @abstractmethod
239
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
240
  """Upsert data
241
-
242
  Importance notes for in-memory storage:
243
  1. Changes will be persisted to disk during the next index_done_callback
244
- 2. update flags to notify other processes that data persistence is needed
245
  """
246
 
247
  @abstractmethod
248
  async def delete(self, ids: list[str]) -> None:
249
  """Delete specific records from storage by their IDs
250
-
251
  Importance notes for in-memory storage:
252
  1. Changes will be persisted to disk during the next index_done_callback
253
  2. update flags to notify other processes that data persistence is needed
254
-
255
  Args:
256
  ids (list[str]): List of document IDs to be deleted from storage
257
-
258
  Returns:
259
  None
260
  """
261
 
262
- async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
263
  """Delete specific records from storage by cache mode
264
-
265
  Importance notes for in-memory storage:
266
  1. Changes will be persisted to disk during the next index_done_callback
267
  2. update flags to notify other processes that data persistence is needed
268
-
269
  Args:
270
  modes (list[str]): List of cache modes to be dropped from storage
271
-
272
  Returns:
273
  True: if the cache drop successfully
274
  False: if the cache drop failed, or the cache mode is not supported
275
  """
276
 
 
277
  @dataclass
278
  class BaseGraphStorage(StorageNameSpace, ABC):
279
  embedding_func: EmbeddingFunc
@@ -394,7 +395,7 @@ class DocStatusStorage(BaseKVStorage, ABC):
394
  ) -> dict[str, DocProcessingStatus]:
395
  """Get all documents with a specific status"""
396
 
397
- async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
398
  """Drop cache is not supported for Doc Status storage"""
399
  return False
400
 
 
111
  @abstractmethod
112
  async def index_done_callback(self) -> None:
113
  """Commit the storage operations after indexing"""
114
+
115
  @abstractmethod
116
  async def drop(self) -> dict[str, str]:
117
  """Drop all data from storage and clean up resources
118
+
119
  This abstract method defines the contract for dropping all data from a storage implementation.
120
  Each storage type must implement this method to:
121
  1. Clear all data from memory and/or external storage
 
124
  4. Handle cleanup of any resources
125
  5. Notify other processes if necessary
126
  6. This action should persistent the data to disk immediately.
127
+
128
  Returns:
129
  dict[str, str]: Operation status and message with the following format:
130
  {
131
  "status": str, # "success" or "error"
132
  "message": str # "data dropped" on success, error details on failure
133
  }
134
+
135
  Implementation specific:
136
  - On success: return {"status": "success", "message": "data dropped"}
137
  - On failure: return {"status": "error", "message": "<error details>"}
 
238
  @abstractmethod
239
  async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
240
  """Upsert data
241
+
242
  Importance notes for in-memory storage:
243
  1. Changes will be persisted to disk during the next index_done_callback
244
+ 2. update flags to notify other processes that data persistence is needed
245
  """
246
 
247
  @abstractmethod
248
  async def delete(self, ids: list[str]) -> None:
249
  """Delete specific records from storage by their IDs
250
+
251
  Importance notes for in-memory storage:
252
  1. Changes will be persisted to disk during the next index_done_callback
253
  2. update flags to notify other processes that data persistence is needed
254
+
255
  Args:
256
  ids (list[str]): List of document IDs to be deleted from storage
257
+
258
  Returns:
259
  None
260
  """
261
 
262
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
263
  """Delete specific records from storage by cache mode
264
+
265
  Importance notes for in-memory storage:
266
  1. Changes will be persisted to disk during the next index_done_callback
267
  2. update flags to notify other processes that data persistence is needed
268
+
269
  Args:
270
  modes (list[str]): List of cache modes to be dropped from storage
271
+
272
  Returns:
273
  True: if the cache drop successfully
274
  False: if the cache drop failed, or the cache mode is not supported
275
  """
276
 
277
+
278
  @dataclass
279
  class BaseGraphStorage(StorageNameSpace, ABC):
280
  embedding_func: EmbeddingFunc
 
395
  ) -> dict[str, DocProcessingStatus]:
396
  """Get all documents with a specific status"""
397
 
398
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
399
  """Drop cache is not supported for Doc Status storage"""
400
  return False
401
 
lightrag/kg/age_impl.py CHANGED
@@ -34,9 +34,9 @@ if not pm.is_installed("psycopg-pool"):
34
  if not pm.is_installed("asyncpg"):
35
  pm.install("asyncpg")
36
 
37
- import psycopg # type: ignore
38
- from psycopg.rows import namedtuple_row # type: ignore
39
- from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore
40
 
41
 
42
  class AGEQueryException(Exception):
@@ -871,10 +871,10 @@ class AGEStorage(BaseGraphStorage):
871
  async def index_done_callback(self) -> None:
872
  # AGES handles persistence automatically
873
  pass
874
-
875
  async def drop(self) -> dict[str, str]:
876
  """Drop the storage by removing all nodes and relationships in the graph.
877
-
878
  Returns:
879
  dict[str, str]: Status of the operation with keys 'status' and 'message'
880
  """
 
34
  if not pm.is_installed("asyncpg"):
35
  pm.install("asyncpg")
36
 
37
+ import psycopg # type: ignore
38
+ from psycopg.rows import namedtuple_row # type: ignore
39
+ from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore
40
 
41
 
42
  class AGEQueryException(Exception):
 
871
  async def index_done_callback(self) -> None:
872
  # AGES handles persistence automatically
873
  pass
874
+
875
  async def drop(self) -> dict[str, str]:
876
  """Drop the storage by removing all nodes and relationships in the graph.
877
+
878
  Returns:
879
  dict[str, str]: Status of the operation with keys 'status' and 'message'
880
  """
lightrag/kg/chroma_impl.py CHANGED
@@ -11,8 +11,8 @@ import pipmaster as pm
11
  if not pm.is_installed("chromadb"):
12
  pm.install("chromadb")
13
 
14
- from chromadb import HttpClient, PersistentClient # type: ignore
15
- from chromadb.config import Settings # type: ignore
16
 
17
 
18
  @final
@@ -336,12 +336,12 @@ class ChromaVectorDBStorage(BaseVectorStorage):
336
  except Exception as e:
337
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
338
  return []
339
-
340
  async def drop(self) -> dict[str, str]:
341
  """Drop all vector data from storage and clean up resources
342
-
343
  This method will delete all documents from the ChromaDB collection.
344
-
345
  Returns:
346
  dict[str, str]: Operation status and message
347
  - On success: {"status": "success", "message": "data dropped"}
@@ -353,8 +353,10 @@ class ChromaVectorDBStorage(BaseVectorStorage):
353
  if result and result["ids"] and len(result["ids"]) > 0:
354
  # Delete all documents
355
  self._collection.delete(ids=result["ids"])
356
-
357
- logger.info(f"Process {os.getpid()} drop ChromaDB collection {self.namespace}")
 
 
358
  return {"status": "success", "message": "data dropped"}
359
  except Exception as e:
360
  logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}")
 
11
  if not pm.is_installed("chromadb"):
12
  pm.install("chromadb")
13
 
14
+ from chromadb import HttpClient, PersistentClient # type: ignore
15
+ from chromadb.config import Settings # type: ignore
16
 
17
 
18
  @final
 
336
  except Exception as e:
337
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
338
  return []
339
+
340
  async def drop(self) -> dict[str, str]:
341
  """Drop all vector data from storage and clean up resources
342
+
343
  This method will delete all documents from the ChromaDB collection.
344
+
345
  Returns:
346
  dict[str, str]: Operation status and message
347
  - On success: {"status": "success", "message": "data dropped"}
 
353
  if result and result["ids"] and len(result["ids"]) > 0:
354
  # Delete all documents
355
  self._collection.delete(ids=result["ids"])
356
+
357
+ logger.info(
358
+ f"Process {os.getpid()} drop ChromaDB collection {self.namespace}"
359
+ )
360
  return {"status": "success", "message": "data dropped"}
361
  except Exception as e:
362
  logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}")
lightrag/kg/faiss_impl.py CHANGED
@@ -443,10 +443,10 @@ class FaissVectorDBStorage(BaseVectorStorage):
443
  results.append({**metadata, "id": metadata.get("__id__")})
444
 
445
  return results
446
-
447
  async def drop(self) -> dict[str, str]:
448
  """Drop all vector data from storage and clean up resources
449
-
450
  This method will:
451
  1. Remove the vector database storage file if it exists
452
  2. Reinitialize the vector database client
@@ -454,7 +454,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
454
  4. Changes is persisted to disk immediately
455
 
456
  This method will remove all vectors from the Faiss index and delete the storage files.
457
-
458
  Returns:
459
  dict[str, str]: Operation status and message
460
  - On success: {"status": "success", "message": "data dropped"}
@@ -465,7 +465,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
465
  # Reset the index
466
  self._index = faiss.IndexFlatIP(self._dim)
467
  self._id_to_meta = {}
468
-
469
  # Remove storage files if they exist
470
  if os.path.exists(self._faiss_index_file):
471
  os.remove(self._faiss_index_file)
@@ -478,7 +478,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
478
  # Notify other processes
479
  await set_all_update_flags(self.namespace)
480
  self.storage_updated.value = False
481
-
482
  logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}")
483
  return {"status": "success", "message": "data dropped"}
484
  except Exception as e:
 
443
  results.append({**metadata, "id": metadata.get("__id__")})
444
 
445
  return results
446
+
447
  async def drop(self) -> dict[str, str]:
448
  """Drop all vector data from storage and clean up resources
449
+
450
  This method will:
451
  1. Remove the vector database storage file if it exists
452
  2. Reinitialize the vector database client
 
454
  4. Changes is persisted to disk immediately
455
 
456
  This method will remove all vectors from the Faiss index and delete the storage files.
457
+
458
  Returns:
459
  dict[str, str]: Operation status and message
460
  - On success: {"status": "success", "message": "data dropped"}
 
465
  # Reset the index
466
  self._index = faiss.IndexFlatIP(self._dim)
467
  self._id_to_meta = {}
468
+
469
  # Remove storage files if they exist
470
  if os.path.exists(self._faiss_index_file):
471
  os.remove(self._faiss_index_file)
 
478
  # Notify other processes
479
  await set_all_update_flags(self.namespace)
480
  self.storage_updated.value = False
481
+
482
  logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}")
483
  return {"status": "success", "message": "data dropped"}
484
  except Exception as e:
lightrag/kg/gremlin_impl.py CHANGED
@@ -24,9 +24,9 @@ from ..base import BaseGraphStorage
24
  if not pm.is_installed("gremlinpython"):
25
  pm.install("gremlinpython")
26
 
27
- from gremlin_python.driver import client, serializer # type: ignore
28
- from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore
29
- from gremlin_python.driver.protocol import GremlinServerError # type: ignore
30
 
31
 
32
  @final
@@ -695,13 +695,13 @@ class GremlinStorage(BaseGraphStorage):
695
  except Exception as e:
696
  logger.error(f"Error during edge deletion: {str(e)}")
697
  raise
698
-
699
  async def drop(self) -> dict[str, str]:
700
  """Drop the storage by removing all nodes and relationships in the graph.
701
-
702
  This function deletes all nodes with the specified graph name property,
703
  which automatically removes all associated edges.
704
-
705
  Returns:
706
  dict[str, str]: Status of the operation with keys 'status' and 'message'
707
  """
 
24
  if not pm.is_installed("gremlinpython"):
25
  pm.install("gremlinpython")
26
 
27
+ from gremlin_python.driver import client, serializer # type: ignore
28
+ from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore
29
+ from gremlin_python.driver.protocol import GremlinServerError # type: ignore
30
 
31
 
32
  @final
 
695
  except Exception as e:
696
  logger.error(f"Error during edge deletion: {str(e)}")
697
  raise
698
+
699
  async def drop(self) -> dict[str, str]:
700
  """Drop the storage by removing all nodes and relationships in the graph.
701
+
702
  This function deletes all nodes with the specified graph name property,
703
  which automatically removes all associated edges.
704
+
705
  Returns:
706
  dict[str, str]: Status of the operation with keys 'status' and 'message'
707
  """
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -112,7 +112,7 @@ class JsonDocStatusStorage(DocStatusStorage):
112
  """
113
  Importance notes for in-memory storage:
114
  1. Changes will be persisted to disk during the next index_done_callback
115
- 2. update flags to notify other processes that data persistence is needed
116
  """
117
  if not data:
118
  return
@@ -129,14 +129,14 @@ class JsonDocStatusStorage(DocStatusStorage):
129
 
130
  async def delete(self, doc_ids: list[str]) -> None:
131
  """Delete specific records from storage by their IDs
132
-
133
  Importance notes for in-memory storage:
134
  1. Changes will be persisted to disk during the next index_done_callback
135
- 2. update flags to notify other processes that data persistence is needed
136
-
137
  Args:
138
  ids (list[str]): List of document IDs to be deleted from storage
139
-
140
  Returns:
141
  None
142
  """
@@ -147,12 +147,12 @@ class JsonDocStatusStorage(DocStatusStorage):
147
 
148
  async def drop(self) -> dict[str, str]:
149
  """Drop all document status data from storage and clean up resources
150
-
151
  This method will:
152
  1. Clear all document status data from memory
153
  2. Update flags to notify other processes
154
  3. Trigger index_done_callback to save the empty state
155
-
156
  Returns:
157
  dict[str, str]: Operation status and message
158
  - On success: {"status": "success", "message": "data dropped"}
 
112
  """
113
  Importance notes for in-memory storage:
114
  1. Changes will be persisted to disk during the next index_done_callback
115
+ 2. update flags to notify other processes that data persistence is needed
116
  """
117
  if not data:
118
  return
 
129
 
130
  async def delete(self, doc_ids: list[str]) -> None:
131
  """Delete specific records from storage by their IDs
132
+
133
  Importance notes for in-memory storage:
134
  1. Changes will be persisted to disk during the next index_done_callback
135
+ 2. update flags to notify other processes that data persistence is needed
136
+
137
  Args:
138
  ids (list[str]): List of document IDs to be deleted from storage
139
+
140
  Returns:
141
  None
142
  """
 
147
 
148
  async def drop(self) -> dict[str, str]:
149
  """Drop all document status data from storage and clean up resources
150
+
151
  This method will:
152
  1. Clear all document status data from memory
153
  2. Update flags to notify other processes
154
  3. Trigger index_done_callback to save the empty state
155
+
156
  Returns:
157
  dict[str, str]: Operation status and message
158
  - On success: {"status": "success", "message": "data dropped"}
lightrag/kg/json_kv_impl.py CHANGED
@@ -117,7 +117,7 @@ class JsonKVStorage(BaseKVStorage):
117
  """
118
  Importance notes for in-memory storage:
119
  1. Changes will be persisted to disk during the next index_done_callback
120
- 2. update flags to notify other processes that data persistence is needed
121
  """
122
  if not data:
123
  return
@@ -128,14 +128,14 @@ class JsonKVStorage(BaseKVStorage):
128
 
129
  async def delete(self, ids: list[str]) -> None:
130
  """Delete specific records from storage by their IDs
131
-
132
  Importance notes for in-memory storage:
133
  1. Changes will be persisted to disk during the next index_done_callback
134
  2. update flags to notify other processes that data persistence is needed
135
-
136
  Args:
137
  ids (list[str]): List of document IDs to be deleted from storage
138
-
139
  Returns:
140
  None
141
  """
@@ -144,39 +144,38 @@ class JsonKVStorage(BaseKVStorage):
144
  self._data.pop(doc_id, None)
145
  await set_all_update_flags(self.namespace)
146
 
147
- async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
148
  """Delete specific records from storage by by cache mode
149
-
150
  Importance notes for in-memory storage:
151
  1. Changes will be persisted to disk during the next index_done_callback
152
  2. update flags to notify other processes that data persistence is needed
153
-
154
  Args:
155
  ids (list[str]): List of cache mode to be drop from storage
156
-
157
  Returns:
158
  True: if the cache drop successfully
159
  False: if the cache drop failed
160
  """
161
  if not modes:
162
  return False
163
-
164
  try:
165
  await self.delete(modes)
166
  return True
167
  except Exception:
168
  return False
169
 
170
-
171
  async def drop(self) -> dict[str, str]:
172
  """Drop all data from storage and clean up resources
173
  This action will persistent the data to disk immediately.
174
-
175
  This method will:
176
  1. Clear all data from memory
177
  2. Update flags to notify other processes
178
  3. Trigger index_done_callback to save the empty state
179
-
180
  Returns:
181
  dict[str, str]: Operation status and message
182
  - On success: {"status": "success", "message": "data dropped"}
 
117
  """
118
  Importance notes for in-memory storage:
119
  1. Changes will be persisted to disk during the next index_done_callback
120
+ 2. update flags to notify other processes that data persistence is needed
121
  """
122
  if not data:
123
  return
 
128
 
129
  async def delete(self, ids: list[str]) -> None:
130
  """Delete specific records from storage by their IDs
131
+
132
  Importance notes for in-memory storage:
133
  1. Changes will be persisted to disk during the next index_done_callback
134
  2. update flags to notify other processes that data persistence is needed
135
+
136
  Args:
137
  ids (list[str]): List of document IDs to be deleted from storage
138
+
139
  Returns:
140
  None
141
  """
 
144
  self._data.pop(doc_id, None)
145
  await set_all_update_flags(self.namespace)
146
 
147
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
148
  """Delete specific records from storage by by cache mode
149
+
150
  Importance notes for in-memory storage:
151
  1. Changes will be persisted to disk during the next index_done_callback
152
  2. update flags to notify other processes that data persistence is needed
153
+
154
  Args:
155
  ids (list[str]): List of cache mode to be drop from storage
156
+
157
  Returns:
158
  True: if the cache drop successfully
159
  False: if the cache drop failed
160
  """
161
  if not modes:
162
  return False
163
+
164
  try:
165
  await self.delete(modes)
166
  return True
167
  except Exception:
168
  return False
169
 
 
170
  async def drop(self) -> dict[str, str]:
171
  """Drop all data from storage and clean up resources
172
  This action will persistent the data to disk immediately.
173
+
174
  This method will:
175
  1. Clear all data from memory
176
  2. Update flags to notify other processes
177
  3. Trigger index_done_callback to save the empty state
178
+
179
  Returns:
180
  dict[str, str]: Operation status and message
181
  - On success: {"status": "success", "message": "data dropped"}
lightrag/kg/milvus_impl.py CHANGED
@@ -15,7 +15,7 @@ if not pm.is_installed("pymilvus"):
15
  pm.install("pymilvus")
16
 
17
  import configparser
18
- from pymilvus import MilvusClient # type: ignore
19
 
20
  config = configparser.ConfigParser()
21
  config.read("config.ini", "utf-8")
@@ -287,12 +287,12 @@ class MilvusVectorDBStorage(BaseVectorStorage):
287
  except Exception as e:
288
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
289
  return []
290
-
291
  async def drop(self) -> dict[str, str]:
292
  """Drop all vector data from storage and clean up resources
293
-
294
  This method will delete all data from the Milvus collection.
295
-
296
  Returns:
297
  dict[str, str]: Operation status and message
298
  - On success: {"status": "success", "message": "data dropped"}
@@ -302,15 +302,17 @@ class MilvusVectorDBStorage(BaseVectorStorage):
302
  # Drop the collection and recreate it
303
  if self._client.has_collection(self.namespace):
304
  self._client.drop_collection(self.namespace)
305
-
306
  # Recreate the collection
307
  MilvusVectorDBStorage.create_collection_if_not_exist(
308
  self._client,
309
  self.namespace,
310
  dimension=self.embedding_func.embedding_dim,
311
  )
312
-
313
- logger.info(f"Process {os.getpid()} drop Milvus collection {self.namespace}")
 
 
314
  return {"status": "success", "message": "data dropped"}
315
  except Exception as e:
316
  logger.error(f"Error dropping Milvus collection {self.namespace}: {e}")
 
15
  pm.install("pymilvus")
16
 
17
  import configparser
18
+ from pymilvus import MilvusClient # type: ignore
19
 
20
  config = configparser.ConfigParser()
21
  config.read("config.ini", "utf-8")
 
287
  except Exception as e:
288
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
289
  return []
290
+
291
  async def drop(self) -> dict[str, str]:
292
  """Drop all vector data from storage and clean up resources
293
+
294
  This method will delete all data from the Milvus collection.
295
+
296
  Returns:
297
  dict[str, str]: Operation status and message
298
  - On success: {"status": "success", "message": "data dropped"}
 
302
  # Drop the collection and recreate it
303
  if self._client.has_collection(self.namespace):
304
  self._client.drop_collection(self.namespace)
305
+
306
  # Recreate the collection
307
  MilvusVectorDBStorage.create_collection_if_not_exist(
308
  self._client,
309
  self.namespace,
310
  dimension=self.embedding_func.embedding_dim,
311
  )
312
+
313
+ logger.info(
314
+ f"Process {os.getpid()} drop Milvus collection {self.namespace}"
315
+ )
316
  return {"status": "success", "message": "data dropped"}
317
  except Exception as e:
318
  logger.error(f"Error dropping Milvus collection {self.namespace}: {e}")
lightrag/kg/mongo_impl.py CHANGED
@@ -25,13 +25,13 @@ if not pm.is_installed("pymongo"):
25
  if not pm.is_installed("motor"):
26
  pm.install("motor")
27
 
28
- from motor.motor_asyncio import ( # type: ignore
29
  AsyncIOMotorClient,
30
  AsyncIOMotorDatabase,
31
  AsyncIOMotorCollection,
32
  )
33
- from pymongo.operations import SearchIndexModel # type: ignore
34
- from pymongo.errors import PyMongoError # type: ignore
35
 
36
  config = configparser.ConfigParser()
37
  config.read("config.ini", "utf-8")
@@ -149,34 +149,36 @@ class MongoKVStorage(BaseKVStorage):
149
  async def index_done_callback(self) -> None:
150
  # Mongo handles persistence automatically
151
  pass
152
-
153
  async def delete(self, ids: list[str]) -> None:
154
  """Delete documents with specified IDs
155
-
156
  Args:
157
  ids: List of document IDs to be deleted
158
  """
159
  if not ids:
160
  return
161
-
162
  try:
163
  result = await self._data.delete_many({"_id": {"$in": ids}})
164
- logger.info(f"Deleted {result.deleted_count} documents from {self.namespace}")
 
 
165
  except PyMongoError as e:
166
  logger.error(f"Error deleting documents from {self.namespace}: {e}")
167
-
168
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
169
  """Delete specific records from storage by cache mode
170
-
171
  Args:
172
  modes (list[str]): List of cache modes to be dropped from storage
173
-
174
  Returns:
175
  bool: True if successful, False otherwise
176
  """
177
  if not modes:
178
  return False
179
-
180
  try:
181
  # Build regex pattern to match documents with the specified modes
182
  pattern = f"^({'|'.join(modes)})_"
@@ -189,16 +191,21 @@ class MongoKVStorage(BaseKVStorage):
189
 
190
  async def drop(self) -> dict[str, str]:
191
  """Drop the storage by removing all documents in the collection.
192
-
193
  Returns:
194
  dict[str, str]: Status of the operation with keys 'status' and 'message'
195
  """
196
  try:
197
  result = await self._data.delete_many({})
198
  deleted_count = result.deleted_count
199
-
200
- logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}")
201
- return {"status": "success", "message": f"{deleted_count} documents dropped"}
 
 
 
 
 
202
  except PyMongoError as e:
203
  logger.error(f"Error dropping doc status {self._collection_name}: {e}")
204
  return {"status": "error", "message": str(e)}
@@ -282,19 +289,24 @@ class MongoDocStatusStorage(DocStatusStorage):
282
  async def index_done_callback(self) -> None:
283
  # Mongo handles persistence automatically
284
  pass
285
-
286
  async def drop(self) -> dict[str, str]:
287
  """Drop the storage by removing all documents in the collection.
288
-
289
  Returns:
290
  dict[str, str]: Status of the operation with keys 'status' and 'message'
291
  """
292
  try:
293
  result = await self._data.delete_many({})
294
  deleted_count = result.deleted_count
295
-
296
- logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}")
297
- return {"status": "success", "message": f"{deleted_count} documents dropped"}
 
 
 
 
 
298
  except PyMongoError as e:
299
  logger.error(f"Error dropping doc status {self._collection_name}: {e}")
300
  return {"status": "error", "message": str(e)}
@@ -911,16 +923,21 @@ class MongoGraphStorage(BaseGraphStorage):
911
 
912
  async def drop(self) -> dict[str, str]:
913
  """Drop the storage by removing all documents in the collection.
914
-
915
  Returns:
916
  dict[str, str]: Status of the operation with keys 'status' and 'message'
917
  """
918
  try:
919
  result = await self.collection.delete_many({})
920
  deleted_count = result.deleted_count
921
-
922
- logger.info(f"Dropped {deleted_count} documents from graph {self._collection_name}")
923
- return {"status": "success", "message": f"{deleted_count} documents dropped"}
 
 
 
 
 
924
  except PyMongoError as e:
925
  logger.error(f"Error dropping graph {self._collection_name}: {e}")
926
  return {"status": "error", "message": str(e)}
@@ -1211,10 +1228,10 @@ class MongoVectorDBStorage(BaseVectorStorage):
1211
  except Exception as e:
1212
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
1213
  return []
1214
-
1215
  async def drop(self) -> dict[str, str]:
1216
  """Drop the storage by removing all documents in the collection and recreating vector index.
1217
-
1218
  Returns:
1219
  dict[str, str]: Status of the operation with keys 'status' and 'message'
1220
  """
@@ -1222,12 +1239,17 @@ class MongoVectorDBStorage(BaseVectorStorage):
1222
  # Delete all documents
1223
  result = await self._data.delete_many({})
1224
  deleted_count = result.deleted_count
1225
-
1226
  # Recreate vector index
1227
  await self.create_vector_index_if_not_exists()
1228
-
1229
- logger.info(f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index")
1230
- return {"status": "success", "message": f"{deleted_count} documents dropped and vector index recreated"}
 
 
 
 
 
1231
  except PyMongoError as e:
1232
  logger.error(f"Error dropping vector storage {self._collection_name}: {e}")
1233
  return {"status": "error", "message": str(e)}
 
25
  if not pm.is_installed("motor"):
26
  pm.install("motor")
27
 
28
+ from motor.motor_asyncio import ( # type: ignore
29
  AsyncIOMotorClient,
30
  AsyncIOMotorDatabase,
31
  AsyncIOMotorCollection,
32
  )
33
+ from pymongo.operations import SearchIndexModel # type: ignore
34
+ from pymongo.errors import PyMongoError # type: ignore
35
 
36
  config = configparser.ConfigParser()
37
  config.read("config.ini", "utf-8")
 
149
  async def index_done_callback(self) -> None:
150
  # Mongo handles persistence automatically
151
  pass
152
+
153
  async def delete(self, ids: list[str]) -> None:
154
  """Delete documents with specified IDs
155
+
156
  Args:
157
  ids: List of document IDs to be deleted
158
  """
159
  if not ids:
160
  return
161
+
162
  try:
163
  result = await self._data.delete_many({"_id": {"$in": ids}})
164
+ logger.info(
165
+ f"Deleted {result.deleted_count} documents from {self.namespace}"
166
+ )
167
  except PyMongoError as e:
168
  logger.error(f"Error deleting documents from {self.namespace}: {e}")
169
+
170
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
171
  """Delete specific records from storage by cache mode
172
+
173
  Args:
174
  modes (list[str]): List of cache modes to be dropped from storage
175
+
176
  Returns:
177
  bool: True if successful, False otherwise
178
  """
179
  if not modes:
180
  return False
181
+
182
  try:
183
  # Build regex pattern to match documents with the specified modes
184
  pattern = f"^({'|'.join(modes)})_"
 
191
 
192
  async def drop(self) -> dict[str, str]:
193
  """Drop the storage by removing all documents in the collection.
194
+
195
  Returns:
196
  dict[str, str]: Status of the operation with keys 'status' and 'message'
197
  """
198
  try:
199
  result = await self._data.delete_many({})
200
  deleted_count = result.deleted_count
201
+
202
+ logger.info(
203
+ f"Dropped {deleted_count} documents from doc status {self._collection_name}"
204
+ )
205
+ return {
206
+ "status": "success",
207
+ "message": f"{deleted_count} documents dropped",
208
+ }
209
  except PyMongoError as e:
210
  logger.error(f"Error dropping doc status {self._collection_name}: {e}")
211
  return {"status": "error", "message": str(e)}
 
289
  async def index_done_callback(self) -> None:
290
  # Mongo handles persistence automatically
291
  pass
292
+
293
  async def drop(self) -> dict[str, str]:
294
  """Drop the storage by removing all documents in the collection.
295
+
296
  Returns:
297
  dict[str, str]: Status of the operation with keys 'status' and 'message'
298
  """
299
  try:
300
  result = await self._data.delete_many({})
301
  deleted_count = result.deleted_count
302
+
303
+ logger.info(
304
+ f"Dropped {deleted_count} documents from doc status {self._collection_name}"
305
+ )
306
+ return {
307
+ "status": "success",
308
+ "message": f"{deleted_count} documents dropped",
309
+ }
310
  except PyMongoError as e:
311
  logger.error(f"Error dropping doc status {self._collection_name}: {e}")
312
  return {"status": "error", "message": str(e)}
 
923
 
924
  async def drop(self) -> dict[str, str]:
925
  """Drop the storage by removing all documents in the collection.
926
+
927
  Returns:
928
  dict[str, str]: Status of the operation with keys 'status' and 'message'
929
  """
930
  try:
931
  result = await self.collection.delete_many({})
932
  deleted_count = result.deleted_count
933
+
934
+ logger.info(
935
+ f"Dropped {deleted_count} documents from graph {self._collection_name}"
936
+ )
937
+ return {
938
+ "status": "success",
939
+ "message": f"{deleted_count} documents dropped",
940
+ }
941
  except PyMongoError as e:
942
  logger.error(f"Error dropping graph {self._collection_name}: {e}")
943
  return {"status": "error", "message": str(e)}
 
1228
  except Exception as e:
1229
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
1230
  return []
1231
+
1232
  async def drop(self) -> dict[str, str]:
1233
  """Drop the storage by removing all documents in the collection and recreating vector index.
1234
+
1235
  Returns:
1236
  dict[str, str]: Status of the operation with keys 'status' and 'message'
1237
  """
 
1239
  # Delete all documents
1240
  result = await self._data.delete_many({})
1241
  deleted_count = result.deleted_count
1242
+
1243
  # Recreate vector index
1244
  await self.create_vector_index_if_not_exists()
1245
+
1246
+ logger.info(
1247
+ f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index"
1248
+ )
1249
+ return {
1250
+ "status": "success",
1251
+ "message": f"{deleted_count} documents dropped and vector index recreated",
1252
+ }
1253
  except PyMongoError as e:
1254
  logger.error(f"Error dropping vector storage {self._collection_name}: {e}")
1255
  return {"status": "error", "message": str(e)}
lightrag/kg/nano_vector_db_impl.py CHANGED
@@ -309,7 +309,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
309
 
310
  async def drop(self) -> dict[str, str]:
311
  """Drop all vector data from storage and clean up resources
312
-
313
  This method will:
314
  1. Remove the vector database storage file if it exists
315
  2. Reinitialize the vector database client
@@ -317,7 +317,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
317
  4. Changes is persisted to disk immediately
318
 
319
  This method is intended for use in scenarios where all data needs to be removed,
320
-
321
  Returns:
322
  dict[str, str]: Operation status and message
323
  - On success: {"status": "success", "message": "data dropped"}
@@ -339,7 +339,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
339
  # Reset own update flag to avoid self-reloading
340
  self.storage_updated.value = False
341
 
342
- logger.info(f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})")
 
 
343
  return {"status": "success", "message": "data dropped"}
344
  except Exception as e:
345
  logger.error(f"Error dropping {self.namespace}: {e}")
 
309
 
310
  async def drop(self) -> dict[str, str]:
311
  """Drop all vector data from storage and clean up resources
312
+
313
  This method will:
314
  1. Remove the vector database storage file if it exists
315
  2. Reinitialize the vector database client
 
317
  4. Changes is persisted to disk immediately
318
 
319
  This method is intended for use in scenarios where all data needs to be removed,
320
+
321
  Returns:
322
  dict[str, str]: Operation status and message
323
  - On success: {"status": "success", "message": "data dropped"}
 
339
  # Reset own update flag to avoid self-reloading
340
  self.storage_updated.value = False
341
 
342
+ logger.info(
343
+ f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})"
344
+ )
345
  return {"status": "success", "message": "data dropped"}
346
  except Exception as e:
347
  logger.error(f"Error dropping {self.namespace}: {e}")
lightrag/kg/neo4j_impl.py CHANGED
@@ -1028,12 +1028,12 @@ class Neo4JStorage(BaseGraphStorage):
1028
  self, algorithm: str
1029
  ) -> tuple[np.ndarray[Any, Any], list[str]]:
1030
  raise NotImplementedError
1031
-
1032
  async def drop(self) -> dict[str, str]:
1033
  """Drop all data from storage and clean up resources
1034
-
1035
  This method will delete all nodes and relationships in the Neo4j database.
1036
-
1037
  Returns:
1038
  dict[str, str]: Operation status and message
1039
  - On success: {"status": "success", "message": "data dropped"}
@@ -1045,8 +1045,10 @@ class Neo4JStorage(BaseGraphStorage):
1045
  query = "MATCH (n) DETACH DELETE n"
1046
  result = await session.run(query)
1047
  await result.consume() # Ensure result is fully consumed
1048
-
1049
- logger.info(f"Process {os.getpid()} drop Neo4j database {self._DATABASE}")
 
 
1050
  return {"status": "success", "message": "data dropped"}
1051
  except Exception as e:
1052
  logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")
 
1028
  self, algorithm: str
1029
  ) -> tuple[np.ndarray[Any, Any], list[str]]:
1030
  raise NotImplementedError
1031
+
1032
  async def drop(self) -> dict[str, str]:
1033
  """Drop all data from storage and clean up resources
1034
+
1035
  This method will delete all nodes and relationships in the Neo4j database.
1036
+
1037
  Returns:
1038
  dict[str, str]: Operation status and message
1039
  - On success: {"status": "success", "message": "data dropped"}
 
1045
  query = "MATCH (n) DETACH DELETE n"
1046
  result = await session.run(query)
1047
  await result.consume() # Ensure result is fully consumed
1048
+
1049
+ logger.info(
1050
+ f"Process {os.getpid()} drop Neo4j database {self._DATABASE}"
1051
+ )
1052
  return {"status": "success", "message": "data dropped"}
1053
  except Exception as e:
1054
  logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")
lightrag/kg/networkx_impl.py CHANGED
@@ -457,13 +457,13 @@ class NetworkXStorage(BaseGraphStorage):
457
 
458
  async def drop(self) -> dict[str, str]:
459
  """Drop all graph data from storage and clean up resources
460
-
461
  This method will:
462
  1. Remove the graph storage file if it exists
463
  2. Reset the graph to an empty state
464
  3. Update flags to notify other processes
465
  4. Changes is persisted to disk immediately
466
-
467
  Returns:
468
  dict[str, str]: Operation status and message
469
  - On success: {"status": "success", "message": "data dropped"}
@@ -479,7 +479,9 @@ class NetworkXStorage(BaseGraphStorage):
479
  await set_all_update_flags(self.namespace)
480
  # Reset own update flag to avoid self-reloading
481
  self.storage_updated.value = False
482
- logger.info(f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})")
 
 
483
  return {"status": "success", "message": "data dropped"}
484
  except Exception as e:
485
  logger.error(f"Error dropping graph {self.namespace}: {e}")
 
457
 
458
  async def drop(self) -> dict[str, str]:
459
  """Drop all graph data from storage and clean up resources
460
+
461
  This method will:
462
  1. Remove the graph storage file if it exists
463
  2. Reset the graph to an empty state
464
  3. Update flags to notify other processes
465
  4. Changes is persisted to disk immediately
466
+
467
  Returns:
468
  dict[str, str]: Operation status and message
469
  - On success: {"status": "success", "message": "data dropped"}
 
479
  await set_all_update_flags(self.namespace)
480
  # Reset own update flag to avoid self-reloading
481
  self.storage_updated.value = False
482
+ logger.info(
483
+ f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})"
484
+ )
485
  return {"status": "success", "message": "data dropped"}
486
  except Exception as e:
487
  logger.error(f"Error dropping graph {self.namespace}: {e}")
lightrag/kg/oracle_impl.py CHANGED
@@ -27,7 +27,7 @@ if not pm.is_installed("oracledb"):
27
  pm.install("oracledb")
28
 
29
  from graspologic import embed
30
- import oracledb # type: ignore
31
 
32
 
33
  class OracleDB:
@@ -406,43 +406,45 @@ class OracleKVStorage(BaseKVStorage):
406
  if not table_name:
407
  logger.error(f"Unknown namespace for deletion: {self.namespace}")
408
  return
409
-
410
  ids_list = ",".join([f"'{id}'" for id in ids])
411
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
412
-
413
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
414
- logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
 
 
415
  except Exception as e:
416
  logger.error(f"Error deleting records from {self.namespace}: {e}")
417
 
418
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
419
  """Delete specific records from storage by cache mode
420
-
421
  Args:
422
  modes (list[str]): List of cache modes to be dropped from storage
423
-
424
  Returns:
425
  bool: True if successful, False otherwise
426
  """
427
  if not modes:
428
  return False
429
-
430
  try:
431
  table_name = namespace_to_table_name(self.namespace)
432
  if not table_name:
433
  return False
434
-
435
  if table_name != "LIGHTRAG_LLM_CACHE":
436
  return False
437
-
438
  # 构建Oracle风格的IN查询
439
  modes_list = ", ".join([f"'{mode}'" for mode in modes])
440
  sql = f"""
441
  DELETE FROM {table_name}
442
- WHERE workspace = :workspace
443
  AND cache_mode IN ({modes_list})
444
  """
445
-
446
  logger.info(f"Deleting cache by modes: {modes}")
447
  await self.db.execute(sql, {"workspace": self.db.workspace})
448
  return True
@@ -455,8 +457,11 @@ class OracleKVStorage(BaseKVStorage):
455
  try:
456
  table_name = namespace_to_table_name(self.namespace)
457
  if not table_name:
458
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
459
-
 
 
 
460
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
461
  table_name=table_name
462
  )
@@ -683,8 +688,11 @@ class OracleVectorDBStorage(BaseVectorStorage):
683
  try:
684
  table_name = namespace_to_table_name(self.namespace)
685
  if not table_name:
686
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
687
-
 
 
 
688
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
689
  table_name=table_name
690
  )
@@ -1025,12 +1033,16 @@ class OracleGraphStorage(BaseGraphStorage):
1025
  """Drop the storage"""
1026
  try:
1027
  # 使用图形查询删除所有节点和关系
1028
- delete_edges_sql = """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace"""
 
 
1029
  await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace})
1030
-
1031
- delete_nodes_sql = """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace"""
 
 
1032
  await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace})
1033
-
1034
  return {"status": "success", "message": "graph data dropped"}
1035
  except Exception as e:
1036
  logger.error(f"Error dropping graph: {e}")
 
27
  pm.install("oracledb")
28
 
29
  from graspologic import embed
30
+ import oracledb # type: ignore
31
 
32
 
33
  class OracleDB:
 
406
  if not table_name:
407
  logger.error(f"Unknown namespace for deletion: {self.namespace}")
408
  return
409
+
410
  ids_list = ",".join([f"'{id}'" for id in ids])
411
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
412
+
413
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
414
+ logger.info(
415
+ f"Successfully deleted {len(ids)} records from {self.namespace}"
416
+ )
417
  except Exception as e:
418
  logger.error(f"Error deleting records from {self.namespace}: {e}")
419
 
420
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
421
  """Delete specific records from storage by cache mode
422
+
423
  Args:
424
  modes (list[str]): List of cache modes to be dropped from storage
425
+
426
  Returns:
427
  bool: True if successful, False otherwise
428
  """
429
  if not modes:
430
  return False
431
+
432
  try:
433
  table_name = namespace_to_table_name(self.namespace)
434
  if not table_name:
435
  return False
436
+
437
  if table_name != "LIGHTRAG_LLM_CACHE":
438
  return False
439
+
440
  # 构建Oracle风格的IN查询
441
  modes_list = ", ".join([f"'{mode}'" for mode in modes])
442
  sql = f"""
443
  DELETE FROM {table_name}
444
+ WHERE workspace = :workspace
445
  AND cache_mode IN ({modes_list})
446
  """
447
+
448
  logger.info(f"Deleting cache by modes: {modes}")
449
  await self.db.execute(sql, {"workspace": self.db.workspace})
450
  return True
 
457
  try:
458
  table_name = namespace_to_table_name(self.namespace)
459
  if not table_name:
460
+ return {
461
+ "status": "error",
462
+ "message": f"Unknown namespace: {self.namespace}",
463
+ }
464
+
465
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
466
  table_name=table_name
467
  )
 
688
  try:
689
  table_name = namespace_to_table_name(self.namespace)
690
  if not table_name:
691
+ return {
692
+ "status": "error",
693
+ "message": f"Unknown namespace: {self.namespace}",
694
+ }
695
+
696
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
697
  table_name=table_name
698
  )
 
1033
  """Drop the storage"""
1034
  try:
1035
  # 使用图形查询删除所有节点和关系
1036
+ delete_edges_sql = (
1037
+ """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace"""
1038
+ )
1039
  await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace})
1040
+
1041
+ delete_nodes_sql = (
1042
+ """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace"""
1043
+ )
1044
  await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace})
1045
+
1046
  return {"status": "success", "message": "graph data dropped"}
1047
  except Exception as e:
1048
  logger.error(f"Error dropping graph: {e}")
lightrag/kg/postgres_impl.py CHANGED
@@ -380,10 +380,10 @@ class PGKVStorage(BaseKVStorage):
380
 
381
  async def delete(self, ids: list[str]) -> None:
382
  """Delete specific records from storage by their IDs
383
-
384
  Args:
385
  ids (list[str]): List of document IDs to be deleted from storage
386
-
387
  Returns:
388
  None
389
  """
@@ -398,40 +398,41 @@ class PGKVStorage(BaseKVStorage):
398
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
399
 
400
  try:
401
- await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
402
- logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}")
 
 
 
 
403
  except Exception as e:
404
  logger.error(f"Error while deleting records from {self.namespace}: {e}")
405
 
406
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
407
  """Delete specific records from storage by cache mode
408
-
409
  Args:
410
  modes (list[str]): List of cache modes to be dropped from storage
411
-
412
  Returns:
413
  bool: True if successful, False otherwise
414
  """
415
  if not modes:
416
  return False
417
-
418
  try:
419
  table_name = namespace_to_table_name(self.namespace)
420
  if not table_name:
421
  return False
422
-
423
  if table_name != "LIGHTRAG_LLM_CACHE":
424
  return False
425
-
426
  sql = f"""
427
  DELETE FROM {table_name}
428
  WHERE workspace = $1 AND mode = ANY($2)
429
  """
430
- params = {
431
- "workspace": self.db.workspace,
432
- "modes": modes
433
- }
434
-
435
  logger.info(f"Deleting cache by modes: {modes}")
436
  await self.db.execute(sql, params)
437
  return True
@@ -444,8 +445,11 @@ class PGKVStorage(BaseKVStorage):
444
  try:
445
  table_name = namespace_to_table_name(self.namespace)
446
  if not table_name:
447
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
448
-
 
 
 
449
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
450
  table_name=table_name
451
  )
@@ -622,7 +626,9 @@ class PGVectorStorage(BaseVectorStorage):
622
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
623
 
624
  try:
625
- await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
 
 
626
  logger.debug(
627
  f"Successfully deleted {len(ids)} vectors from {self.namespace}"
628
  )
@@ -759,8 +765,11 @@ class PGVectorStorage(BaseVectorStorage):
759
  try:
760
  table_name = namespace_to_table_name(self.namespace)
761
  if not table_name:
762
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
763
-
 
 
 
764
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
765
  table_name=table_name
766
  )
@@ -930,8 +939,11 @@ class PGDocStatusStorage(DocStatusStorage):
930
  try:
931
  table_name = namespace_to_table_name(self.namespace)
932
  if not table_name:
933
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
934
-
 
 
 
935
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
936
  table_name=table_name
937
  )
@@ -1626,7 +1638,7 @@ class PGGraphStorage(BaseGraphStorage):
1626
  MATCH (n)
1627
  DETACH DELETE n
1628
  $$) AS (result agtype)"""
1629
-
1630
  await self._query(drop_query, readonly=False)
1631
  return {"status": "success", "message": "graph data dropped"}
1632
  except Exception as e:
@@ -1812,7 +1824,7 @@ SQL_TEMPLATES = {
1812
  chunk_ids=EXCLUDED.chunk_ids,
1813
  file_path=EXCLUDED.file_path,
1814
  update_time = CURRENT_TIMESTAMP
1815
- """,
1816
  "relationships": """
1817
  WITH relevant_chunks AS (
1818
  SELECT id as chunk_id
 
380
 
381
  async def delete(self, ids: list[str]) -> None:
382
  """Delete specific records from storage by their IDs
383
+
384
  Args:
385
  ids (list[str]): List of document IDs to be deleted from storage
386
+
387
  Returns:
388
  None
389
  """
 
398
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
399
 
400
  try:
401
+ await self.db.execute(
402
+ delete_sql, {"workspace": self.db.workspace, "ids": ids}
403
+ )
404
+ logger.debug(
405
+ f"Successfully deleted {len(ids)} records from {self.namespace}"
406
+ )
407
  except Exception as e:
408
  logger.error(f"Error while deleting records from {self.namespace}: {e}")
409
 
410
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
411
  """Delete specific records from storage by cache mode
412
+
413
  Args:
414
  modes (list[str]): List of cache modes to be dropped from storage
415
+
416
  Returns:
417
  bool: True if successful, False otherwise
418
  """
419
  if not modes:
420
  return False
421
+
422
  try:
423
  table_name = namespace_to_table_name(self.namespace)
424
  if not table_name:
425
  return False
426
+
427
  if table_name != "LIGHTRAG_LLM_CACHE":
428
  return False
429
+
430
  sql = f"""
431
  DELETE FROM {table_name}
432
  WHERE workspace = $1 AND mode = ANY($2)
433
  """
434
+ params = {"workspace": self.db.workspace, "modes": modes}
435
+
 
 
 
436
  logger.info(f"Deleting cache by modes: {modes}")
437
  await self.db.execute(sql, params)
438
  return True
 
445
  try:
446
  table_name = namespace_to_table_name(self.namespace)
447
  if not table_name:
448
+ return {
449
+ "status": "error",
450
+ "message": f"Unknown namespace: {self.namespace}",
451
+ }
452
+
453
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
454
  table_name=table_name
455
  )
 
626
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
627
 
628
  try:
629
+ await self.db.execute(
630
+ delete_sql, {"workspace": self.db.workspace, "ids": ids}
631
+ )
632
  logger.debug(
633
  f"Successfully deleted {len(ids)} vectors from {self.namespace}"
634
  )
 
765
  try:
766
  table_name = namespace_to_table_name(self.namespace)
767
  if not table_name:
768
+ return {
769
+ "status": "error",
770
+ "message": f"Unknown namespace: {self.namespace}",
771
+ }
772
+
773
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
774
  table_name=table_name
775
  )
 
939
  try:
940
  table_name = namespace_to_table_name(self.namespace)
941
  if not table_name:
942
+ return {
943
+ "status": "error",
944
+ "message": f"Unknown namespace: {self.namespace}",
945
+ }
946
+
947
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
948
  table_name=table_name
949
  )
 
1638
  MATCH (n)
1639
  DETACH DELETE n
1640
  $$) AS (result agtype)"""
1641
+
1642
  await self._query(drop_query, readonly=False)
1643
  return {"status": "success", "message": "graph data dropped"}
1644
  except Exception as e:
 
1824
  chunk_ids=EXCLUDED.chunk_ids,
1825
  file_path=EXCLUDED.file_path,
1826
  update_time = CURRENT_TIMESTAMP
1827
+ """,
1828
  "relationships": """
1829
  WITH relevant_chunks AS (
1830
  SELECT id as chunk_id
lightrag/kg/qdrant_impl.py CHANGED
@@ -13,11 +13,12 @@ import pipmaster as pm
13
  if not pm.is_installed("qdrant-client"):
14
  pm.install("qdrant-client")
15
 
16
- from qdrant_client import QdrantClient, models # type: ignore
17
 
18
  config = configparser.ConfigParser()
19
  config.read("config.ini", "utf-8")
20
 
 
21
  def compute_mdhash_id_for_qdrant(
22
  content: str, prefix: str = "", style: str = "simple"
23
  ) -> str:
@@ -272,7 +273,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
272
  except Exception as e:
273
  logger.error(f"Error searching for prefix '{prefix}': {e}")
274
  return []
275
-
276
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
277
  """Get vector data by its ID
278
 
@@ -285,22 +286,22 @@ class QdrantVectorDBStorage(BaseVectorStorage):
285
  try:
286
  # Convert to Qdrant compatible ID
287
  qdrant_id = compute_mdhash_id_for_qdrant(id)
288
-
289
  # Retrieve the point by ID
290
  result = self._client.retrieve(
291
  collection_name=self.namespace,
292
  ids=[qdrant_id],
293
  with_payload=True,
294
  )
295
-
296
  if not result:
297
  return None
298
-
299
  return result[0].payload
300
  except Exception as e:
301
  logger.error(f"Error retrieving vector data for ID {id}: {e}")
302
  return None
303
-
304
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
305
  """Get multiple vector data by their IDs
306
 
@@ -312,28 +313,28 @@ class QdrantVectorDBStorage(BaseVectorStorage):
312
  """
313
  if not ids:
314
  return []
315
-
316
  try:
317
  # Convert to Qdrant compatible IDs
318
  qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
319
-
320
  # Retrieve the points by IDs
321
  results = self._client.retrieve(
322
  collection_name=self.namespace,
323
  ids=qdrant_ids,
324
  with_payload=True,
325
  )
326
-
327
  return [point.payload for point in results]
328
  except Exception as e:
329
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
330
  return []
331
-
332
  async def drop(self) -> dict[str, str]:
333
  """Drop all vector data from storage and clean up resources
334
-
335
  This method will delete all data from the Qdrant collection.
336
-
337
  Returns:
338
  dict[str, str]: Operation status and message
339
  - On success: {"status": "success", "message": "data dropped"}
@@ -343,17 +344,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
343
  # Delete the collection and recreate it
344
  if self._client.collection_exists(self.namespace):
345
  self._client.delete_collection(self.namespace)
346
-
347
  # Recreate the collection
348
  QdrantVectorDBStorage.create_collection_if_not_exist(
349
  self._client,
350
  self.namespace,
351
  vectors_config=models.VectorParams(
352
- size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE
 
353
  ),
354
  )
355
-
356
- logger.info(f"Process {os.getpid()} drop Qdrant collection {self.namespace}")
 
 
357
  return {"status": "success", "message": "data dropped"}
358
  except Exception as e:
359
  logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}")
 
13
  if not pm.is_installed("qdrant-client"):
14
  pm.install("qdrant-client")
15
 
16
+ from qdrant_client import QdrantClient, models # type: ignore
17
 
18
  config = configparser.ConfigParser()
19
  config.read("config.ini", "utf-8")
20
 
21
+
22
  def compute_mdhash_id_for_qdrant(
23
  content: str, prefix: str = "", style: str = "simple"
24
  ) -> str:
 
273
  except Exception as e:
274
  logger.error(f"Error searching for prefix '{prefix}': {e}")
275
  return []
276
+
277
  async def get_by_id(self, id: str) -> dict[str, Any] | None:
278
  """Get vector data by its ID
279
 
 
286
  try:
287
  # Convert to Qdrant compatible ID
288
  qdrant_id = compute_mdhash_id_for_qdrant(id)
289
+
290
  # Retrieve the point by ID
291
  result = self._client.retrieve(
292
  collection_name=self.namespace,
293
  ids=[qdrant_id],
294
  with_payload=True,
295
  )
296
+
297
  if not result:
298
  return None
299
+
300
  return result[0].payload
301
  except Exception as e:
302
  logger.error(f"Error retrieving vector data for ID {id}: {e}")
303
  return None
304
+
305
  async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
306
  """Get multiple vector data by their IDs
307
 
 
313
  """
314
  if not ids:
315
  return []
316
+
317
  try:
318
  # Convert to Qdrant compatible IDs
319
  qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
320
+
321
  # Retrieve the points by IDs
322
  results = self._client.retrieve(
323
  collection_name=self.namespace,
324
  ids=qdrant_ids,
325
  with_payload=True,
326
  )
327
+
328
  return [point.payload for point in results]
329
  except Exception as e:
330
  logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
331
  return []
332
+
333
  async def drop(self) -> dict[str, str]:
334
  """Drop all vector data from storage and clean up resources
335
+
336
  This method will delete all data from the Qdrant collection.
337
+
338
  Returns:
339
  dict[str, str]: Operation status and message
340
  - On success: {"status": "success", "message": "data dropped"}
 
344
  # Delete the collection and recreate it
345
  if self._client.collection_exists(self.namespace):
346
  self._client.delete_collection(self.namespace)
347
+
348
  # Recreate the collection
349
  QdrantVectorDBStorage.create_collection_if_not_exist(
350
  self._client,
351
  self.namespace,
352
  vectors_config=models.VectorParams(
353
+ size=self.embedding_func.embedding_dim,
354
+ distance=models.Distance.COSINE,
355
  ),
356
  )
357
+
358
+ logger.info(
359
+ f"Process {os.getpid()} drop Qdrant collection {self.namespace}"
360
+ )
361
  return {"status": "success", "message": "data dropped"}
362
  except Exception as e:
363
  logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}")
lightrag/kg/redis_impl.py CHANGED
@@ -8,7 +8,7 @@ if not pm.is_installed("redis"):
8
  pm.install("redis")
9
 
10
  # aioredis is a depricated library, replaced with redis
11
- from redis.asyncio import Redis # type: ignore
12
  from lightrag.utils import logger
13
  from lightrag.base import BaseKVStorage
14
  import json
@@ -83,51 +83,51 @@ class RedisKVStorage(BaseKVStorage):
83
  logger.info(
84
  f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
85
  )
86
-
87
- async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
88
  """Delete specific records from storage by by cache mode
89
-
90
  Importance notes for Redis storage:
91
  1. This will immediately delete the specified cache modes from Redis
92
-
93
  Args:
94
  modes (list[str]): List of cache mode to be drop from storage
95
-
96
  Returns:
97
  True: if the cache drop successfully
98
  False: if the cache drop failed
99
  """
100
  if not modes:
101
  return False
102
-
103
  try:
104
  await self.delete(modes)
105
  return True
106
  except Exception:
107
  return False
108
-
109
  async def drop(self) -> dict[str, str]:
110
  """Drop the storage by removing all keys under the current namespace.
111
-
112
  Returns:
113
  dict[str, str]: Status of the operation with keys 'status' and 'message'
114
  """
115
  try:
116
  keys = await self._redis.keys(f"{self.namespace}:*")
117
-
118
  if keys:
119
  pipe = self._redis.pipeline()
120
  for key in keys:
121
  pipe.delete(key)
122
  results = await pipe.execute()
123
  deleted_count = sum(results)
124
-
125
  logger.info(f"Dropped {deleted_count} keys from {self.namespace}")
126
  return {"status": "success", "message": f"{deleted_count} keys dropped"}
127
  else:
128
  logger.info(f"No keys found to drop in {self.namespace}")
129
  return {"status": "success", "message": "no keys to drop"}
130
-
131
  except Exception as e:
132
  logger.error(f"Error dropping keys from {self.namespace}: {e}")
133
  return {"status": "error", "message": str(e)}
 
8
  pm.install("redis")
9
 
10
  # aioredis is a depricated library, replaced with redis
11
+ from redis.asyncio import Redis # type: ignore
12
  from lightrag.utils import logger
13
  from lightrag.base import BaseKVStorage
14
  import json
 
83
  logger.info(
84
  f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
85
  )
86
+
87
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
88
  """Delete specific records from storage by by cache mode
89
+
90
  Importance notes for Redis storage:
91
  1. This will immediately delete the specified cache modes from Redis
92
+
93
  Args:
94
  modes (list[str]): List of cache mode to be drop from storage
95
+
96
  Returns:
97
  True: if the cache drop successfully
98
  False: if the cache drop failed
99
  """
100
  if not modes:
101
  return False
102
+
103
  try:
104
  await self.delete(modes)
105
  return True
106
  except Exception:
107
  return False
108
+
109
  async def drop(self) -> dict[str, str]:
110
  """Drop the storage by removing all keys under the current namespace.
111
+
112
  Returns:
113
  dict[str, str]: Status of the operation with keys 'status' and 'message'
114
  """
115
  try:
116
  keys = await self._redis.keys(f"{self.namespace}:*")
117
+
118
  if keys:
119
  pipe = self._redis.pipeline()
120
  for key in keys:
121
  pipe.delete(key)
122
  results = await pipe.execute()
123
  deleted_count = sum(results)
124
+
125
  logger.info(f"Dropped {deleted_count} keys from {self.namespace}")
126
  return {"status": "success", "message": f"{deleted_count} keys dropped"}
127
  else:
128
  logger.info(f"No keys found to drop in {self.namespace}")
129
  return {"status": "success", "message": "no keys to drop"}
130
+
131
  except Exception as e:
132
  logger.error(f"Error dropping keys from {self.namespace}: {e}")
133
  return {"status": "error", "message": str(e)}
lightrag/kg/tidb_impl.py CHANGED
@@ -20,7 +20,7 @@ if not pm.is_installed("pymysql"):
20
  if not pm.is_installed("sqlalchemy"):
21
  pm.install("sqlalchemy")
22
 
23
- from sqlalchemy import create_engine, text # type: ignore
24
 
25
 
26
  class TiDB:
@@ -290,47 +290,49 @@ class TiDBKVStorage(BaseKVStorage):
290
  try:
291
  table_name = namespace_to_table_name(self.namespace)
292
  id_field = namespace_to_id(self.namespace)
293
-
294
  if not table_name or not id_field:
295
  logger.error(f"Unknown namespace for deletion: {self.namespace}")
296
  return
297
-
298
  ids_list = ",".join([f"'{id}'" for id in ids])
299
  delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
300
-
301
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
302
- logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
 
 
303
  except Exception as e:
304
  logger.error(f"Error deleting records from {self.namespace}: {e}")
305
 
306
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
307
  """Delete specific records from storage by cache mode
308
-
309
  Args:
310
  modes (list[str]): List of cache modes to be dropped from storage
311
-
312
  Returns:
313
  bool: True if successful, False otherwise
314
  """
315
  if not modes:
316
  return False
317
-
318
  try:
319
  table_name = namespace_to_table_name(self.namespace)
320
  if not table_name:
321
  return False
322
-
323
  if table_name != "LIGHTRAG_LLM_CACHE":
324
  return False
325
-
326
  # 构建MySQL风格的IN查询
327
  modes_list = ", ".join([f"'{mode}'" for mode in modes])
328
  sql = f"""
329
  DELETE FROM {table_name}
330
- WHERE workspace = :workspace
331
  AND mode IN ({modes_list})
332
  """
333
-
334
  logger.info(f"Deleting cache by modes: {modes}")
335
  await self.db.execute(sql, {"workspace": self.db.workspace})
336
  return True
@@ -343,8 +345,11 @@ class TiDBKVStorage(BaseKVStorage):
343
  try:
344
  table_name = namespace_to_table_name(self.namespace)
345
  if not table_name:
346
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
347
-
 
 
 
348
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
349
  table_name=table_name
350
  )
@@ -492,7 +497,7 @@ class TiDBVectorDBStorage(BaseVectorStorage):
492
 
493
  table_name = namespace_to_table_name(self.namespace)
494
  id_field = namespace_to_id(self.namespace)
495
-
496
  if not table_name or not id_field:
497
  logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
498
  return
@@ -502,7 +507,9 @@ class TiDBVectorDBStorage(BaseVectorStorage):
502
 
503
  try:
504
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
505
- logger.debug(f"Successfully deleted {len(ids)} vectors from {self.namespace}")
 
 
506
  except Exception as e:
507
  logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
508
 
@@ -551,8 +558,11 @@ class TiDBVectorDBStorage(BaseVectorStorage):
551
  try:
552
  table_name = namespace_to_table_name(self.namespace)
553
  if not table_name:
554
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
555
-
 
 
 
556
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
557
  table_name=table_name
558
  )
 
20
  if not pm.is_installed("sqlalchemy"):
21
  pm.install("sqlalchemy")
22
 
23
+ from sqlalchemy import create_engine, text # type: ignore
24
 
25
 
26
  class TiDB:
 
290
  try:
291
  table_name = namespace_to_table_name(self.namespace)
292
  id_field = namespace_to_id(self.namespace)
293
+
294
  if not table_name or not id_field:
295
  logger.error(f"Unknown namespace for deletion: {self.namespace}")
296
  return
297
+
298
  ids_list = ",".join([f"'{id}'" for id in ids])
299
  delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
300
+
301
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
302
+ logger.info(
303
+ f"Successfully deleted {len(ids)} records from {self.namespace}"
304
+ )
305
  except Exception as e:
306
  logger.error(f"Error deleting records from {self.namespace}: {e}")
307
 
308
  async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
309
  """Delete specific records from storage by cache mode
310
+
311
  Args:
312
  modes (list[str]): List of cache modes to be dropped from storage
313
+
314
  Returns:
315
  bool: True if successful, False otherwise
316
  """
317
  if not modes:
318
  return False
319
+
320
  try:
321
  table_name = namespace_to_table_name(self.namespace)
322
  if not table_name:
323
  return False
324
+
325
  if table_name != "LIGHTRAG_LLM_CACHE":
326
  return False
327
+
328
  # 构建MySQL风格的IN查询
329
  modes_list = ", ".join([f"'{mode}'" for mode in modes])
330
  sql = f"""
331
  DELETE FROM {table_name}
332
+ WHERE workspace = :workspace
333
  AND mode IN ({modes_list})
334
  """
335
+
336
  logger.info(f"Deleting cache by modes: {modes}")
337
  await self.db.execute(sql, {"workspace": self.db.workspace})
338
  return True
 
345
  try:
346
  table_name = namespace_to_table_name(self.namespace)
347
  if not table_name:
348
+ return {
349
+ "status": "error",
350
+ "message": f"Unknown namespace: {self.namespace}",
351
+ }
352
+
353
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
354
  table_name=table_name
355
  )
 
497
 
498
  table_name = namespace_to_table_name(self.namespace)
499
  id_field = namespace_to_id(self.namespace)
500
+
501
  if not table_name or not id_field:
502
  logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
503
  return
 
507
 
508
  try:
509
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
510
+ logger.debug(
511
+ f"Successfully deleted {len(ids)} vectors from {self.namespace}"
512
+ )
513
  except Exception as e:
514
  logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
515
 
 
558
  try:
559
  table_name = namespace_to_table_name(self.namespace)
560
  if not table_name:
561
+ return {
562
+ "status": "error",
563
+ "message": f"Unknown namespace: {self.namespace}",
564
+ }
565
+
566
  drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
567
  table_name=table_name
568
  )