File size: 4,158 Bytes
b547d72
225e779
20d5a6e
b547d72
 
 
fd9a2f5
b547d72
 
fd9a2f5
 
 
 
 
1ce985e
 
 
 
 
980a2a9
b547d72
20d5a6e
b547d72
 
 
 
e5f9f74
b547d72
 
5d78930
e5f9f74
1ce985e
e5f9f74
 
1ce985e
 
7fb023a
1ce985e
 
7fb023a
1ce985e
 
 
 
b547d72
20d5a6e
b547d72
7fb023a
5d78930
fd9a2f5
 
 
7fb023a
5d78930
 
 
 
fd9a2f5
b547d72
604fb32
b547d72
bc1b96e
7fb023a
5d78930
 
b547d72
 
7d3ff11
 
 
 
fba7aaa
7fb023a
5d78930
 
 
 
 
 
 
 
 
 
 
 
fba7aaa
2f4b338
20d5a6e
7fb023a
25287b8
 
 
ab86a1f
b547d72
f171ddf
f7fd237
 
 
 
7fb023a
5d78930
b547d72
 
a86ec5a
7fb023a
5d78930
b547d72
 
7fb023a
5d78930
 
38e998f
50e2388
cdd60af
 
7fb023a
5d78930
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from dataclasses import dataclass
import os
from typing import Any, Union, final

from lightrag.base import (
    DocProcessingStatus,
    DocStatus,
    DocStatusStorage,
)
from lightrag.utils import (
    load_json,
    logger,
    write_json,
)
from .shared_storage import (
    get_namespace_data,
    get_storage_lock,
    try_initialize_namespace,
)


@final
@dataclass
class JsonDocStatusStorage(DocStatusStorage):
    """JSON implementation of document status storage"""

    def __post_init__(self):
        working_dir = self.global_config["working_dir"]
        self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
        self._storage_lock = get_storage_lock()
        self._data = None

    async def initialize(self):
        """Initialize storage data"""
        # check need_init must before get_namespace_data
        need_init = try_initialize_namespace(self.namespace)
        self._data = await get_namespace_data(self.namespace)
        if need_init:
            loaded_data = load_json(self._file_name) or {}
            async with self._storage_lock:
                self._data.update(loaded_data)
                logger.info(
                    f"Loaded document status storage with {len(loaded_data)} records"
                )

    async def filter_keys(self, keys: set[str]) -> set[str]:
        """Return keys that should be processed (not in storage or not successfully processed)"""
        async with self._storage_lock:
            return set(keys) - set(self._data.keys())

    async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
        result: list[dict[str, Any]] = []
        async with self._storage_lock:
            for id in ids:
                data = self._data.get(id, None)
                if data:
                    result.append(data)
        return result

    async def get_status_counts(self) -> dict[str, int]:
        """Get counts of documents in each status"""
        counts = {status.value: 0 for status in DocStatus}
        async with self._storage_lock:
            for doc in self._data.values():
                counts[doc["status"]] += 1
        return counts

    async def get_docs_by_status(
        self, status: DocStatus
    ) -> dict[str, DocProcessingStatus]:
        """Get all documents with a specific status"""
        result = {}
        async with self._storage_lock:
            for k, v in self._data.items():
                if v["status"] == status.value:
                    try:
                        # Make a copy of the data to avoid modifying the original
                        data = v.copy()
                        # If content is missing, use content_summary as content
                        if "content" not in data and "content_summary" in data:
                            data["content"] = data["content_summary"]
                        result[k] = DocProcessingStatus(**data)
                    except KeyError as e:
                        logger.error(f"Missing required field for document {k}: {e}")
                        continue
        return result

    async def index_done_callback(self) -> None:
        async with self._storage_lock:
            data_dict = (
                dict(self._data) if hasattr(self._data, "_getvalue") else self._data
            )
            write_json(data_dict, self._file_name)

    async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
        logger.info(f"Inserting {len(data)} to {self.namespace}")
        if not data:
            return

        async with self._storage_lock:
            self._data.update(data)
        await self.index_done_callback()

    async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
        async with self._storage_lock:
            return self._data.get(id)

    async def delete(self, doc_ids: list[str]):
        async with self._storage_lock:
            for doc_id in doc_ids:
                self._data.pop(doc_id, None)
        await self.index_done_callback()

    async def drop(self) -> None:
        """Drop the storage"""
        async with self._storage_lock:
            self._data.clear()