yangdx commited on
Commit
d1215f8
·
1 Parent(s): 2e31f26

Fix linting

Browse files
lightrag/api/utils_api.py CHANGED
@@ -359,12 +359,10 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
359
  # Inject chunk configuration
360
  args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int)
361
  args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int)
362
-
363
  # Inject LLM cache configuration
364
  args.enable_llm_cache_for_extract = get_env_value(
365
- "ENABLE_LLM_CACHE_FOR_EXTRACT",
366
- False,
367
- bool
368
  )
369
 
370
  ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name
 
359
  # Inject chunk configuration
360
  args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int)
361
  args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int)
362
+
363
  # Inject LLM cache configuration
364
  args.enable_llm_cache_for_extract = get_env_value(
365
+ "ENABLE_LLM_CACHE_FOR_EXTRACT", False, bool
 
 
366
  )
367
 
368
  ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name
lightrag/kg/json_doc_status_impl.py CHANGED
@@ -96,11 +96,15 @@ class JsonDocStatusStorage(DocStatusStorage):
96
 
97
  async def index_done_callback(self) -> None:
98
  async with self._storage_lock:
99
- if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
 
 
100
  data_dict = (
101
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
102
  )
103
- logger.info(f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}")
 
 
104
  write_json(data_dict, self._file_name)
105
  await clear_all_update_flags(self.namespace)
106
 
 
96
 
97
  async def index_done_callback(self) -> None:
98
  async with self._storage_lock:
99
+ if (is_multiprocess and self.storage_updated.value) or (
100
+ not is_multiprocess and self.storage_updated
101
+ ):
102
  data_dict = (
103
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
104
  )
105
+ logger.info(
106
+ f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}"
107
+ )
108
  write_json(data_dict, self._file_name)
109
  await clear_all_update_flags(self.namespace)
110
 
lightrag/kg/json_kv_impl.py CHANGED
@@ -44,21 +44,28 @@ class JsonKVStorage(BaseKVStorage):
44
  loaded_data = load_json(self._file_name) or {}
45
  async with self._storage_lock:
46
  self._data.update(loaded_data)
47
-
48
  # Calculate data count based on namespace
49
  if self.namespace.endswith("cache"):
50
  # For cache namespaces, sum the cache entries across all cache types
51
- data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values()
52
- if isinstance(first_level_dict, dict))
 
 
 
53
  else:
54
  # For non-cache namespaces, use the original count method
55
  data_count = len(loaded_data)
56
-
57
- logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records")
 
 
58
 
59
  async def index_done_callback(self) -> None:
60
  async with self._storage_lock:
61
- if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
 
 
62
  data_dict = (
63
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
64
  )
@@ -66,17 +73,21 @@ class JsonKVStorage(BaseKVStorage):
66
  # Calculate data count based on namespace
67
  if self.namespace.endswith("cache"):
68
  # # For cache namespaces, sum the cache entries across all cache types
69
- data_count = sum(len(first_level_dict) for first_level_dict in data_dict.values()
70
- if isinstance(first_level_dict, dict))
 
 
 
71
  else:
72
  # For non-cache namespaces, use the original count method
73
  data_count = len(data_dict)
74
-
75
- logger.info(f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}")
 
 
76
  write_json(data_dict, self._file_name)
77
  await clear_all_update_flags(self.namespace)
78
 
79
-
80
  async def get_all(self) -> dict[str, Any]:
81
  """Get all data from storage
82
 
 
44
  loaded_data = load_json(self._file_name) or {}
45
  async with self._storage_lock:
46
  self._data.update(loaded_data)
47
+
48
  # Calculate data count based on namespace
49
  if self.namespace.endswith("cache"):
50
  # For cache namespaces, sum the cache entries across all cache types
51
+ data_count = sum(
52
+ len(first_level_dict)
53
+ for first_level_dict in loaded_data.values()
54
+ if isinstance(first_level_dict, dict)
55
+ )
56
  else:
57
  # For non-cache namespaces, use the original count method
58
  data_count = len(loaded_data)
59
+
60
+ logger.info(
61
+ f"Process {os.getpid()} KV load {self.namespace} with {data_count} records"
62
+ )
63
 
64
  async def index_done_callback(self) -> None:
65
  async with self._storage_lock:
66
+ if (is_multiprocess and self.storage_updated.value) or (
67
+ not is_multiprocess and self.storage_updated
68
+ ):
69
  data_dict = (
70
  dict(self._data) if hasattr(self._data, "_getvalue") else self._data
71
  )
 
73
  # Calculate data count based on namespace
74
  if self.namespace.endswith("cache"):
75
  # # For cache namespaces, sum the cache entries across all cache types
76
+ data_count = sum(
77
+ len(first_level_dict)
78
+ for first_level_dict in data_dict.values()
79
+ if isinstance(first_level_dict, dict)
80
+ )
81
  else:
82
  # For non-cache namespaces, use the original count method
83
  data_count = len(data_dict)
84
+
85
+ logger.info(
86
+ f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
87
+ )
88
  write_json(data_dict, self._file_name)
89
  await clear_all_update_flags(self.namespace)
90
 
 
91
  async def get_all(self) -> dict[str, Any]:
92
  """Get all data from storage
93
 
lightrag/kg/shared_storage.py CHANGED
@@ -344,6 +344,7 @@ async def set_all_update_flags(namespace: str):
344
  else:
345
  _update_flags[namespace][i] = True
346
 
 
347
  async def clear_all_update_flags(namespace: str):
348
  """Clear all update flag of namespace indicating all workers need to reload data from files"""
349
  global _update_flags
@@ -360,6 +361,7 @@ async def clear_all_update_flags(namespace: str):
360
  else:
361
  _update_flags[namespace][i] = False
362
 
 
363
  async def get_all_update_flags_status() -> Dict[str, list]:
364
  """
365
  Get update flags status for all namespaces.
 
344
  else:
345
  _update_flags[namespace][i] = True
346
 
347
+
348
  async def clear_all_update_flags(namespace: str):
349
  """Clear all update flag of namespace indicating all workers need to reload data from files"""
350
  global _update_flags
 
361
  else:
362
  _update_flags[namespace][i] = False
363
 
364
+
365
  async def get_all_update_flags_status() -> Dict[str, list]:
366
  """
367
  Get update flags status for all namespaces.
lightrag/lightrag.py CHANGED
@@ -354,7 +354,9 @@ class LightRAG:
354
  namespace=make_namespace(
355
  self.namespace_prefix, NameSpace.KV_STORE_LLM_RESPONSE_CACHE
356
  ),
357
- global_config=asdict(self), # Add global_config to ensure cache works properly
 
 
358
  embedding_func=self.embedding_func,
359
  )
360
 
 
354
  namespace=make_namespace(
355
  self.namespace_prefix, NameSpace.KV_STORE_LLM_RESPONSE_CACHE
356
  ),
357
+ global_config=asdict(
358
+ self
359
+ ), # Add global_config to ensure cache works properly
360
  embedding_func=self.embedding_func,
361
  )
362
 
lightrag/utils.py CHANGED
@@ -706,7 +706,7 @@ class CacheData:
706
 
707
  async def save_to_cache(hashing_kv, cache_data: CacheData):
708
  """Save data to cache, with improved handling for streaming responses and duplicate content.
709
-
710
  Args:
711
  hashing_kv: The key-value storage for caching
712
  cache_data: The cache data to save
@@ -714,12 +714,12 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
714
  # Skip if storage is None or content is a streaming response
715
  if hashing_kv is None or not cache_data.content:
716
  return
717
-
718
  # If content is a streaming response, don't cache it
719
  if hasattr(cache_data.content, "__aiter__"):
720
  logger.debug("Streaming response detected, skipping cache")
721
  return
722
-
723
  # Get existing cache data
724
  if exists_func(hashing_kv, "get_by_mode_and_id"):
725
  mode_cache = (
@@ -728,14 +728,16 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
728
  )
729
  else:
730
  mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {}
731
-
732
  # Check if we already have identical content cached
733
  if cache_data.args_hash in mode_cache:
734
  existing_content = mode_cache[cache_data.args_hash].get("return")
735
  if existing_content == cache_data.content:
736
- logger.info(f"Cache content unchanged for {cache_data.args_hash}, skipping update")
 
 
737
  return
738
-
739
  # Update cache with new content
740
  mode_cache[cache_data.args_hash] = {
741
  "return": cache_data.content,
@@ -750,7 +752,7 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
750
  "embedding_max": cache_data.max_val,
751
  "original_prompt": cache_data.prompt,
752
  }
753
-
754
  # Only upsert if there's actual new content
755
  await hashing_kv.upsert({cache_data.mode: mode_cache})
756
 
 
706
 
707
  async def save_to_cache(hashing_kv, cache_data: CacheData):
708
  """Save data to cache, with improved handling for streaming responses and duplicate content.
709
+
710
  Args:
711
  hashing_kv: The key-value storage for caching
712
  cache_data: The cache data to save
 
714
  # Skip if storage is None or content is a streaming response
715
  if hashing_kv is None or not cache_data.content:
716
  return
717
+
718
  # If content is a streaming response, don't cache it
719
  if hasattr(cache_data.content, "__aiter__"):
720
  logger.debug("Streaming response detected, skipping cache")
721
  return
722
+
723
  # Get existing cache data
724
  if exists_func(hashing_kv, "get_by_mode_and_id"):
725
  mode_cache = (
 
728
  )
729
  else:
730
  mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {}
731
+
732
  # Check if we already have identical content cached
733
  if cache_data.args_hash in mode_cache:
734
  existing_content = mode_cache[cache_data.args_hash].get("return")
735
  if existing_content == cache_data.content:
736
+ logger.info(
737
+ f"Cache content unchanged for {cache_data.args_hash}, skipping update"
738
+ )
739
  return
740
+
741
  # Update cache with new content
742
  mode_cache[cache_data.args_hash] = {
743
  "return": cache_data.content,
 
752
  "embedding_max": cache_data.max_val,
753
  "original_prompt": cache_data.prompt,
754
  }
755
+
756
  # Only upsert if there's actual new content
757
  await hashing_kv.upsert({cache_data.mode: mode_cache})
758