yangdx commited on
Commit
08e5593
·
1 Parent(s): 82a2eed

Add drop_cace_by_modes to all KV storage implementation

Browse files
lightrag/base.py CHANGED
@@ -259,6 +259,20 @@ class BaseKVStorage(StorageNameSpace, ABC):
259
  None
260
  """
261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
262
 
263
  @dataclass
264
  class BaseGraphStorage(StorageNameSpace, ABC):
@@ -310,7 +324,6 @@ class BaseGraphStorage(StorageNameSpace, ABC):
310
  KG-storage-log should be used to avoid data corruption
311
  """
312
 
313
-
314
  @abstractmethod
315
  async def delete_node(self, node_id: str) -> None:
316
  """Embed nodes using an algorithm."""
@@ -381,6 +394,10 @@ class DocStatusStorage(BaseKVStorage, ABC):
381
  ) -> dict[str, DocProcessingStatus]:
382
  """Get all documents with a specific status"""
383
 
 
 
 
 
384
 
385
  class StoragesStatus(str, Enum):
386
  """Storages status"""
 
259
  None
260
  """
261
 
262
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
263
+ """Delete specific records from storage by cache mode
264
+
265
+ Importance notes for in-memory storage:
266
+ 1. Changes will be persisted to disk during the next index_done_callback
267
+ 2. update flags to notify other processes that data persistence is needed
268
+
269
+ Args:
270
+ modes (list[str]): List of cache modes to be dropped from storage
271
+
272
+ Returns:
273
+ True: if the cache drop successfully
274
+ False: if the cache drop failed, or the cache mode is not supported
275
+ """
276
 
277
  @dataclass
278
  class BaseGraphStorage(StorageNameSpace, ABC):
 
324
  KG-storage-log should be used to avoid data corruption
325
  """
326
 
 
327
  @abstractmethod
328
  async def delete_node(self, node_id: str) -> None:
329
  """Embed nodes using an algorithm."""
 
394
  ) -> dict[str, DocProcessingStatus]:
395
  """Get all documents with a specific status"""
396
 
397
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
398
+ """Drop cache is not supported for Doc Status storage"""
399
+ return False
400
+
401
 
402
  class StoragesStatus(str, Enum):
403
  """Storages status"""
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -127,7 +127,7 @@ class JsonDocStatusStorage(DocStatusStorage):
127
  async with self._storage_lock:
128
  return self._data.get(id)
129
 
130
- async def delete(self, doc_ids: list[str]):
131
  """Delete specific records from storage by their IDs
132
 
133
  Importance notes for in-memory storage:
 
127
  async with self._storage_lock:
128
  return self._data.get(id)
129
 
130
+ async def delete(self, doc_ids: list[str]) -> None:
131
  """Delete specific records from storage by their IDs
132
 
133
  Importance notes for in-memory storage:
lightrag/kg/json_kv_impl.py CHANGED
@@ -144,6 +144,30 @@ class JsonKVStorage(BaseKVStorage):
144
  self._data.pop(doc_id, None)
145
  await set_all_update_flags(self.namespace)
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  async def drop(self) -> dict[str, str]:
148
  """Drop all data from storage and clean up resources
149
  This action will persistent the data to disk immediately.
 
144
  self._data.pop(doc_id, None)
145
  await set_all_update_flags(self.namespace)
146
 
147
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
148
+ """Delete specific records from storage by by cache mode
149
+
150
+ Importance notes for in-memory storage:
151
+ 1. Changes will be persisted to disk during the next index_done_callback
152
+ 2. update flags to notify other processes that data persistence is needed
153
+
154
+ Args:
155
+ ids (list[str]): List of cache mode to be drop from storage
156
+
157
+ Returns:
158
+ True: if the cache drop successfully
159
+ False: if the cache drop failed
160
+ """
161
+ if not modes:
162
+ return False
163
+
164
+ try:
165
+ await self.delete(modes)
166
+ return True
167
+ except Exception:
168
+ return False
169
+
170
+
171
  async def drop(self) -> dict[str, str]:
172
  """Drop all data from storage and clean up resources
173
  This action will persistent the data to disk immediately.
lightrag/kg/mongo_impl.py CHANGED
@@ -165,6 +165,28 @@ class MongoKVStorage(BaseKVStorage):
165
  except PyMongoError as e:
166
  logger.error(f"Error deleting documents from {self.namespace}: {e}")
167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  async def drop(self) -> dict[str, str]:
169
  """Drop the storage by removing all documents in the collection.
170
 
 
165
  except PyMongoError as e:
166
  logger.error(f"Error deleting documents from {self.namespace}: {e}")
167
 
168
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
169
+ """Delete specific records from storage by cache mode
170
+
171
+ Args:
172
+ modes (list[str]): List of cache modes to be dropped from storage
173
+
174
+ Returns:
175
+ bool: True if successful, False otherwise
176
+ """
177
+ if not modes:
178
+ return False
179
+
180
+ try:
181
+ # Build regex pattern to match documents with the specified modes
182
+ pattern = f"^({'|'.join(modes)})_"
183
+ result = await self._data.delete_many({"_id": {"$regex": pattern}})
184
+ logger.info(f"Deleted {result.deleted_count} documents by modes: {modes}")
185
+ return True
186
+ except Exception as e:
187
+ logger.error(f"Error deleting cache by modes {modes}: {e}")
188
+ return False
189
+
190
  async def drop(self) -> dict[str, str]:
191
  """Drop the storage by removing all documents in the collection.
192
 
lightrag/kg/oracle_impl.py CHANGED
@@ -392,32 +392,63 @@ class OracleKVStorage(BaseKVStorage):
392
  # Oracle handles persistence automatically
393
  pass
394
 
395
- async def delete(self, ids: list[str]) -> dict[str, str]:
396
  """Delete records with specified IDs from the storage.
397
 
398
  Args:
399
  ids: List of record IDs to be deleted
400
-
401
- Returns:
402
- Dictionary with status and message
403
  """
404
  if not ids:
405
- return {"status": "success", "message": "No IDs provided for deletion"}
406
 
407
  try:
408
  table_name = namespace_to_table_name(self.namespace)
409
  if not table_name:
410
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
 
411
 
412
  ids_list = ",".join([f"'{id}'" for id in ids])
413
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
414
 
415
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
416
  logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
417
- return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
418
  except Exception as e:
419
  logger.error(f"Error deleting records from {self.namespace}: {e}")
420
- return {"status": "error", "message": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
421
 
422
  async def drop(self) -> dict[str, str]:
423
  """Drop the storage"""
 
392
  # Oracle handles persistence automatically
393
  pass
394
 
395
+ async def delete(self, ids: list[str]) -> None:
396
  """Delete records with specified IDs from the storage.
397
 
398
  Args:
399
  ids: List of record IDs to be deleted
 
 
 
400
  """
401
  if not ids:
402
+ return
403
 
404
  try:
405
  table_name = namespace_to_table_name(self.namespace)
406
  if not table_name:
407
+ logger.error(f"Unknown namespace for deletion: {self.namespace}")
408
+ return
409
 
410
  ids_list = ",".join([f"'{id}'" for id in ids])
411
  delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
412
 
413
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
414
  logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
 
415
  except Exception as e:
416
  logger.error(f"Error deleting records from {self.namespace}: {e}")
417
+
418
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
419
+ """Delete specific records from storage by cache mode
420
+
421
+ Args:
422
+ modes (list[str]): List of cache modes to be dropped from storage
423
+
424
+ Returns:
425
+ bool: True if successful, False otherwise
426
+ """
427
+ if not modes:
428
+ return False
429
+
430
+ try:
431
+ table_name = namespace_to_table_name(self.namespace)
432
+ if not table_name:
433
+ return False
434
+
435
+ if table_name != "LIGHTRAG_LLM_CACHE":
436
+ return False
437
+
438
+ # 构建Oracle风格的IN查询
439
+ modes_list = ", ".join([f"'{mode}'" for mode in modes])
440
+ sql = f"""
441
+ DELETE FROM {table_name}
442
+ WHERE workspace = :workspace
443
+ AND cache_mode IN ({modes_list})
444
+ """
445
+
446
+ logger.info(f"Deleting cache by modes: {modes}")
447
+ await self.db.execute(sql, {"workspace": self.db.workspace})
448
+ return True
449
+ except Exception as e:
450
+ logger.error(f"Error deleting cache by modes {modes}: {e}")
451
+ return False
452
 
453
  async def drop(self) -> dict[str, str]:
454
  """Drop the storage"""
lightrag/kg/postgres_impl.py CHANGED
@@ -378,6 +378,67 @@ class PGKVStorage(BaseKVStorage):
378
  # PG handles persistence automatically
379
  pass
380
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
381
  async def drop(self) -> dict[str, str]:
382
  """Drop the storage"""
383
  try:
@@ -558,13 +619,10 @@ class PGVectorStorage(BaseVectorStorage):
558
  logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
559
  return
560
 
561
- ids_list = ",".join([f"'{id}'" for id in ids])
562
- delete_sql = (
563
- f"DELETE FROM {table_name} WHERE workspace=$1 AND id IN ({ids_list})"
564
- )
565
 
566
  try:
567
- await self.db.execute(delete_sql, {"workspace": self.db.workspace})
568
  logger.debug(
569
  f"Successfully deleted {len(ids)} vectors from {self.namespace}"
570
  )
 
378
  # PG handles persistence automatically
379
  pass
380
 
381
+ async def delete(self, ids: list[str]) -> None:
382
+ """Delete specific records from storage by their IDs
383
+
384
+ Args:
385
+ ids (list[str]): List of document IDs to be deleted from storage
386
+
387
+ Returns:
388
+ None
389
+ """
390
+ if not ids:
391
+ return
392
+
393
+ table_name = namespace_to_table_name(self.namespace)
394
+ if not table_name:
395
+ logger.error(f"Unknown namespace for deletion: {self.namespace}")
396
+ return
397
+
398
+ delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
399
+
400
+ try:
401
+ await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
402
+ logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}")
403
+ except Exception as e:
404
+ logger.error(f"Error while deleting records from {self.namespace}: {e}")
405
+
406
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
407
+ """Delete specific records from storage by cache mode
408
+
409
+ Args:
410
+ modes (list[str]): List of cache modes to be dropped from storage
411
+
412
+ Returns:
413
+ bool: True if successful, False otherwise
414
+ """
415
+ if not modes:
416
+ return False
417
+
418
+ try:
419
+ table_name = namespace_to_table_name(self.namespace)
420
+ if not table_name:
421
+ return False
422
+
423
+ if table_name != "LIGHTRAG_LLM_CACHE":
424
+ return False
425
+
426
+ sql = f"""
427
+ DELETE FROM {table_name}
428
+ WHERE workspace = $1 AND mode = ANY($2)
429
+ """
430
+ params = {
431
+ "workspace": self.db.workspace,
432
+ "modes": modes
433
+ }
434
+
435
+ logger.info(f"Deleting cache by modes: {modes}")
436
+ await self.db.execute(sql, params)
437
+ return True
438
+ except Exception as e:
439
+ logger.error(f"Error deleting cache by modes {modes}: {e}")
440
+ return False
441
+
442
  async def drop(self) -> dict[str, str]:
443
  """Drop the storage"""
444
  try:
 
619
  logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
620
  return
621
 
622
+ delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
 
 
 
623
 
624
  try:
625
+ await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
626
  logger.debug(
627
  f"Successfully deleted {len(ids)} vectors from {self.namespace}"
628
  )
lightrag/kg/redis_impl.py CHANGED
@@ -84,6 +84,28 @@ class RedisKVStorage(BaseKVStorage):
84
  f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
85
  )
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  async def drop(self) -> dict[str, str]:
88
  """Drop the storage by removing all keys under the current namespace.
89
 
 
84
  f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
85
  )
86
 
87
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
88
+ """Delete specific records from storage by by cache mode
89
+
90
+ Importance notes for Redis storage:
91
+ 1. This will immediately delete the specified cache modes from Redis
92
+
93
+ Args:
94
+ modes (list[str]): List of cache mode to be drop from storage
95
+
96
+ Returns:
97
+ True: if the cache drop successfully
98
+ False: if the cache drop failed
99
+ """
100
+ if not modes:
101
+ return False
102
+
103
+ try:
104
+ await self.delete(modes)
105
+ return True
106
+ except Exception:
107
+ return False
108
+
109
  async def drop(self) -> dict[str, str]:
110
  """Drop the storage by removing all keys under the current namespace.
111
 
lightrag/kg/tidb_impl.py CHANGED
@@ -278,34 +278,65 @@ class TiDBKVStorage(BaseKVStorage):
278
  # Ti handles persistence automatically
279
  pass
280
 
281
- async def delete(self, ids: list[str]) -> dict[str, str]:
282
  """Delete records with specified IDs from the storage.
283
 
284
  Args:
285
  ids: List of record IDs to be deleted
286
-
287
- Returns:
288
- Dictionary with status and message
289
  """
290
  if not ids:
291
- return {"status": "success", "message": "No IDs provided for deletion"}
292
 
293
  try:
294
  table_name = namespace_to_table_name(self.namespace)
295
  id_field = namespace_to_id(self.namespace)
296
 
297
  if not table_name or not id_field:
298
- return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
 
299
 
300
  ids_list = ",".join([f"'{id}'" for id in ids])
301
  delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
302
 
303
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
304
  logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
305
- return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
306
  except Exception as e:
307
  logger.error(f"Error deleting records from {self.namespace}: {e}")
308
- return {"status": "error", "message": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
309
 
310
  async def drop(self) -> dict[str, str]:
311
  """Drop the storage"""
 
278
  # Ti handles persistence automatically
279
  pass
280
 
281
+ async def delete(self, ids: list[str]) -> None:
282
  """Delete records with specified IDs from the storage.
283
 
284
  Args:
285
  ids: List of record IDs to be deleted
 
 
 
286
  """
287
  if not ids:
288
+ return
289
 
290
  try:
291
  table_name = namespace_to_table_name(self.namespace)
292
  id_field = namespace_to_id(self.namespace)
293
 
294
  if not table_name or not id_field:
295
+ logger.error(f"Unknown namespace for deletion: {self.namespace}")
296
+ return
297
 
298
  ids_list = ",".join([f"'{id}'" for id in ids])
299
  delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
300
 
301
  await self.db.execute(delete_sql, {"workspace": self.db.workspace})
302
  logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
 
303
  except Exception as e:
304
  logger.error(f"Error deleting records from {self.namespace}: {e}")
305
+
306
+ async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
307
+ """Delete specific records from storage by cache mode
308
+
309
+ Args:
310
+ modes (list[str]): List of cache modes to be dropped from storage
311
+
312
+ Returns:
313
+ bool: True if successful, False otherwise
314
+ """
315
+ if not modes:
316
+ return False
317
+
318
+ try:
319
+ table_name = namespace_to_table_name(self.namespace)
320
+ if not table_name:
321
+ return False
322
+
323
+ if table_name != "LIGHTRAG_LLM_CACHE":
324
+ return False
325
+
326
+ # 构建MySQL风格的IN查询
327
+ modes_list = ", ".join([f"'{mode}'" for mode in modes])
328
+ sql = f"""
329
+ DELETE FROM {table_name}
330
+ WHERE workspace = :workspace
331
+ AND mode IN ({modes_list})
332
+ """
333
+
334
+ logger.info(f"Deleting cache by modes: {modes}")
335
+ await self.db.execute(sql, {"workspace": self.db.workspace})
336
+ return True
337
+ except Exception as e:
338
+ logger.error(f"Error deleting cache by modes {modes}: {e}")
339
+ return False
340
 
341
  async def drop(self) -> dict[str, str]:
342
  """Drop the storage"""