yangdx commited on
Commit
b012523
·
1 Parent(s): d5e5dc5

Avoid redundant llm cache updates

Browse files
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -20,6 +20,7 @@ from .shared_storage import (
20
  set_all_update_flags,
21
  clear_all_update_flags,
22
  try_initialize_namespace,
 
23
  )
24
 
25
 
@@ -95,7 +96,7 @@ class JsonDocStatusStorage(DocStatusStorage):
95
 
96
  async def index_done_callback(self) -> None:
97
  async with self._storage_lock:
98
- if self.storage_updated:
99
  data_dict = (
100
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
101
  )
 
20
  set_all_update_flags,
21
  clear_all_update_flags,
22
  try_initialize_namespace,
23
+ is_multiprocess,
24
  )
25
 
26
 
 
96
 
97
  async def index_done_callback(self) -> None:
98
  async with self._storage_lock:
99
+ if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
100
  data_dict = (
101
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
102
  )
lightrag/kg/json_kv_impl.py CHANGED
@@ -18,6 +18,7 @@ from .shared_storage import (
18
  set_all_update_flags,
19
  clear_all_update_flags,
20
  try_initialize_namespace,
 
21
  )
22
 
23
 
@@ -57,7 +58,7 @@ class JsonKVStorage(BaseKVStorage):
57
 
58
  async def index_done_callback(self) -> None:
59
  async with self._storage_lock:
60
- if self.storage_updated:
61
  data_dict = (
62
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
63
  )
 
18
  set_all_update_flags,
19
  clear_all_update_flags,
20
  try_initialize_namespace,
21
+ is_multiprocess,
22
  )
23
 
24
 
 
58
 
59
  async def index_done_callback(self) -> None:
60
  async with self._storage_lock:
61
+ if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
62
  data_dict = (
63
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
64
  )
lightrag/utils.py CHANGED
@@ -705,9 +705,22 @@ class CacheData:
705
 
706
 
707
  async def save_to_cache(hashing_kv, cache_data: CacheData):
708
- if hashing_kv is None or hasattr(cache_data.content, "__aiter__"):
 
 
 
 
 
 
 
709
  return
710
-
 
 
 
 
 
 
711
  if exists_func(hashing_kv, "get_by_mode_and_id"):
712
  mode_cache = (
713
  await hashing_kv.get_by_mode_and_id(cache_data.mode, cache_data.args_hash)
@@ -715,7 +728,15 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
715
  )
716
  else:
717
  mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {}
718
-
 
 
 
 
 
 
 
 
719
  mode_cache[cache_data.args_hash] = {
720
  "return": cache_data.content,
721
  "cache_type": cache_data.cache_type,
@@ -729,7 +750,8 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
729
  "embedding_max": cache_data.max_val,
730
  "original_prompt": cache_data.prompt,
731
  }
732
-
 
733
  await hashing_kv.upsert({cache_data.mode: mode_cache})
734
 
735
 
 
705
 
706
 
707
  async def save_to_cache(hashing_kv, cache_data: CacheData):
708
+ """Save data to cache, with improved handling for streaming responses and duplicate content.
709
+
710
+ Args:
711
+ hashing_kv: The key-value storage for caching
712
+ cache_data: The cache data to save
713
+ """
714
+ # Skip if storage is None or content is a streaming response
715
+ if hashing_kv is None or not cache_data.content:
716
  return
717
+
718
+ # If content is a streaming response, don't cache it
719
+ if hasattr(cache_data.content, "__aiter__"):
720
+ logger.debug("Streaming response detected, skipping cache")
721
+ return
722
+
723
+ # Get existing cache data
724
  if exists_func(hashing_kv, "get_by_mode_and_id"):
725
  mode_cache = (
726
  await hashing_kv.get_by_mode_and_id(cache_data.mode, cache_data.args_hash)
 
728
  )
729
  else:
730
  mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {}
731
+
732
+ # Check if we already have identical content cached
733
+ if cache_data.args_hash in mode_cache:
734
+ existing_content = mode_cache[cache_data.args_hash].get("return")
735
+ if existing_content == cache_data.content:
736
+ logger.info(f"Cache content unchanged for {cache_data.args_hash}, skipping update")
737
+ return
738
+
739
+ # Update cache with new content
740
  mode_cache[cache_data.args_hash] = {
741
  "return": cache_data.content,
742
  "cache_type": cache_data.cache_type,
 
750
  "embedding_max": cache_data.max_val,
751
  "original_prompt": cache_data.prompt,
752
  }
753
+
754
+ # Only upsert if there's actual new content
755
  await hashing_kv.upsert({cache_data.mode: mode_cache})
756
 
757