ParisNeo commited on
Commit
b547d72
·
unverified ·
1 Parent(s): 2cc6794

Create jsondocstatus_storage.py

Browse files
lightrag/storage/jsondocstatus_storage.py ADDED
@@ -0,0 +1,139 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ JsonDocStatus Storage Module
3
+ =======================
4
+
5
+ This module provides a storage interface for graphs using NetworkX, a popular Python library for creating, manipulating, and studying the structure, dynamics, and functions of complex networks.
6
+
7
+ The `NetworkXStorage` class extends the `BaseGraphStorage` class from the LightRAG library, providing methods to load, save, manipulate, and query graphs using NetworkX.
8
+
9
+ Author: lightrag team
10
+ Created: 2024-01-25
11
+ License: MIT
12
+
13
+ Permission is hereby granted, free of charge, to any person obtaining a copy
14
+ of this software and associated documentation files (the "Software"), to deal
15
+ in the Software without restriction, including without limitation the rights
16
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17
+ copies of the Software, and to permit persons to whom the Software is
18
+ furnished to do so, subject to the following conditions:
19
+
20
+ The above copyright notice and this permission notice shall be included in all
21
+ copies or substantial portions of the Software.
22
+
23
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29
+ SOFTWARE.
30
+
31
+ Version: 1.0.0
32
+
33
+ Dependencies:
34
+ - NetworkX
35
+ - NumPy
36
+ - LightRAG
37
+ - graspologic
38
+
39
+ Features:
40
+ - Load and save graphs in various formats (e.g., GEXF, GraphML, JSON)
41
+ - Query graph nodes and edges
42
+ - Calculate node and edge degrees
43
+ - Embed nodes using various algorithms (e.g., Node2Vec)
44
+ - Remove nodes and edges from the graph
45
+
46
+ Usage:
47
+ from lightrag.storage.networkx_storage import NetworkXStorage
48
+
49
+ """
50
+
51
+
52
+ import asyncio
53
+ import html
54
+ import os
55
+ from tqdm.asyncio import tqdm as tqdm_async
56
+ from dataclasses import dataclass
57
+ from typing import Any, Union, cast, Dict
58
+ import numpy as np
59
+
60
+ import time
61
+
62
+ from lightrag.utils import (
63
+ logger,
64
+ load_json,
65
+ write_json,
66
+ compute_mdhash_id,
67
+ )
68
+
69
+ from lightrag.base import (
70
+ BaseGraphStorage,
71
+ BaseKVStorage,
72
+ BaseVectorStorage,
73
+ DocStatus,
74
+ DocProcessingStatus,
75
+ DocStatusStorage,
76
+ )
77
+
78
+
79
+ @dataclass
80
+ class JsonDocStatusStorage(DocStatusStorage):
81
+ """JSON implementation of document status storage"""
82
+
83
+ def __post_init__(self):
84
+ working_dir = self.global_config["working_dir"]
85
+ self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
86
+ self._data = load_json(self._file_name) or {}
87
+ logger.info(f"Loaded document status storage with {len(self._data)} records")
88
+
89
+ async def filter_keys(self, data: list[str]) -> set[str]:
90
+ """Return keys that should be processed (not in storage or not successfully processed)"""
91
+ return set(
92
+ [
93
+ k
94
+ for k in data
95
+ if k not in self._data or self._data[k]["status"] != DocStatus.PROCESSED
96
+ ]
97
+ )
98
+
99
+ async def get_status_counts(self) -> Dict[str, int]:
100
+ """Get counts of documents in each status"""
101
+ counts = {status: 0 for status in DocStatus}
102
+ for doc in self._data.values():
103
+ counts[doc["status"]] += 1
104
+ return counts
105
+
106
+ async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]:
107
+ """Get all failed documents"""
108
+ return {k: v for k, v in self._data.items() if v["status"] == DocStatus.FAILED}
109
+
110
+ async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]:
111
+ """Get all pending documents"""
112
+ return {k: v for k, v in self._data.items() if v["status"] == DocStatus.PENDING}
113
+
114
+ async def index_done_callback(self):
115
+ """Save data to file after indexing"""
116
+ write_json(self._data, self._file_name)
117
+
118
+ async def upsert(self, data: dict[str, dict]):
119
+ """Update or insert document status
120
+
121
+ Args:
122
+ data: Dictionary of document IDs and their status data
123
+ """
124
+ self._data.update(data)
125
+ await self.index_done_callback()
126
+ return data
127
+
128
+ async def get_by_id(self, id: str):
129
+ return self._data.get(id)
130
+
131
+ async def get(self, doc_id: str) -> Union[DocProcessingStatus, None]:
132
+ """Get document status by ID"""
133
+ return self._data.get(doc_id)
134
+
135
+ async def delete(self, doc_ids: list[str]):
136
+ """Delete document status by IDs"""
137
+ for doc_id in doc_ids:
138
+ self._data.pop(doc_id, None)
139
+ await self.index_done_callback()