File size: 4,158 Bytes
b547d72 225e779 20d5a6e b547d72 fd9a2f5 b547d72 fd9a2f5 1ce985e 980a2a9 b547d72 20d5a6e b547d72 e5f9f74 b547d72 5d78930 e5f9f74 1ce985e e5f9f74 1ce985e 7fb023a 1ce985e 7fb023a 1ce985e b547d72 20d5a6e b547d72 7fb023a 5d78930 fd9a2f5 7fb023a 5d78930 fd9a2f5 b547d72 604fb32 b547d72 bc1b96e 7fb023a 5d78930 b547d72 7d3ff11 fba7aaa 7fb023a 5d78930 fba7aaa 2f4b338 20d5a6e 7fb023a 25287b8 ab86a1f b547d72 f171ddf f7fd237 7fb023a 5d78930 b547d72 a86ec5a 7fb023a 5d78930 b547d72 7fb023a 5d78930 38e998f 50e2388 cdd60af 7fb023a 5d78930 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
from dataclasses import dataclass
import os
from typing import Any, Union, final
from lightrag.base import (
DocProcessingStatus,
DocStatus,
DocStatusStorage,
)
from lightrag.utils import (
load_json,
logger,
write_json,
)
from .shared_storage import (
get_namespace_data,
get_storage_lock,
try_initialize_namespace,
)
@final
@dataclass
class JsonDocStatusStorage(DocStatusStorage):
"""JSON implementation of document status storage"""
def __post_init__(self):
working_dir = self.global_config["working_dir"]
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
self._storage_lock = get_storage_lock()
self._data = None
async def initialize(self):
"""Initialize storage data"""
# check need_init must before get_namespace_data
need_init = try_initialize_namespace(self.namespace)
self._data = await get_namespace_data(self.namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
self._data.update(loaded_data)
logger.info(
f"Loaded document status storage with {len(loaded_data)} records"
)
async def filter_keys(self, keys: set[str]) -> set[str]:
"""Return keys that should be processed (not in storage or not successfully processed)"""
async with self._storage_lock:
return set(keys) - set(self._data.keys())
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
async with self._storage_lock:
for id in ids:
data = self._data.get(id, None)
if data:
result.append(data)
return result
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
counts = {status.value: 0 for status in DocStatus}
async with self._storage_lock:
for doc in self._data.values():
counts[doc["status"]] += 1
return counts
async def get_docs_by_status(
self, status: DocStatus
) -> dict[str, DocProcessingStatus]:
"""Get all documents with a specific status"""
result = {}
async with self._storage_lock:
for k, v in self._data.items():
if v["status"] == status.value:
try:
# Make a copy of the data to avoid modifying the original
data = v.copy()
# If content is missing, use content_summary as content
if "content" not in data and "content_summary" in data:
data["content"] = data["content_summary"]
result[k] = DocProcessingStatus(**data)
except KeyError as e:
logger.error(f"Missing required field for document {k}: {e}")
continue
return result
async def index_done_callback(self) -> None:
async with self._storage_lock:
data_dict = (
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
)
write_json(data_dict, self._file_name)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data:
return
async with self._storage_lock:
self._data.update(data)
await self.index_done_callback()
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
async with self._storage_lock:
return self._data.get(id)
async def delete(self, doc_ids: list[str]):
async with self._storage_lock:
for doc_id in doc_ids:
self._data.pop(doc_id, None)
await self.index_done_callback()
async def drop(self) -> None:
"""Drop the storage"""
async with self._storage_lock:
self._data.clear()
|