feat(db): Add data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS
Browse files- lightrag/kg/postgres_impl.py +62 -0
lightrag/kg/postgres_impl.py
CHANGED
@@ -189,6 +189,62 @@ class PostgreSQLDB:
|
|
189 |
# Log error but don't interrupt the process
|
190 |
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
|
191 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
192 |
async def check_tables(self):
|
193 |
# First create all tables
|
194 |
for k, v in TABLES.items():
|
@@ -240,6 +296,12 @@ class PostgreSQLDB:
|
|
240 |
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
|
241 |
# Don't throw an exception, allow the initialization process to continue
|
242 |
|
|
|
|
|
|
|
|
|
|
|
|
|
243 |
async def query(
|
244 |
self,
|
245 |
sql: str,
|
|
|
189 |
# Log error but don't interrupt the process
|
190 |
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
|
191 |
|
192 |
+
async def _migrate_doc_chunks_to_vdb_chunks(self):
|
193 |
+
"""
|
194 |
+
Migrate data from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS if specific conditions are met.
|
195 |
+
This migration is intended for users who are upgrading and have an older table structure
|
196 |
+
where LIGHTRAG_DOC_CHUNKS contained a `content_vector` column.
|
197 |
+
|
198 |
+
"""
|
199 |
+
try:
|
200 |
+
# 1. Check if the new table LIGHTRAG_VDB_CHUNKS is empty
|
201 |
+
vdb_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_VDB_CHUNKS"
|
202 |
+
vdb_chunks_count_result = await self.query(vdb_chunks_count_sql)
|
203 |
+
if vdb_chunks_count_result and vdb_chunks_count_result["count"] > 0:
|
204 |
+
logger.info(
|
205 |
+
"Skipping migration: LIGHTRAG_VDB_CHUNKS already contains data."
|
206 |
+
)
|
207 |
+
return
|
208 |
+
|
209 |
+
# 2. Check if `content_vector` column exists in the old table
|
210 |
+
check_column_sql = """
|
211 |
+
SELECT 1 FROM information_schema.columns
|
212 |
+
WHERE table_name = 'lightrag_doc_chunks' AND column_name = 'content_vector'
|
213 |
+
"""
|
214 |
+
column_exists = await self.query(check_column_sql)
|
215 |
+
if not column_exists:
|
216 |
+
logger.info(
|
217 |
+
"Skipping migration: `content_vector` not found in LIGHTRAG_DOC_CHUNKS"
|
218 |
+
)
|
219 |
+
return
|
220 |
+
|
221 |
+
# 3. Check if the old table LIGHTRAG_DOC_CHUNKS has data
|
222 |
+
doc_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_DOC_CHUNKS"
|
223 |
+
doc_chunks_count_result = await self.query(doc_chunks_count_sql)
|
224 |
+
if not doc_chunks_count_result or doc_chunks_count_result["count"] == 0:
|
225 |
+
logger.info("Skipping migration: LIGHTRAG_DOC_CHUNKS is empty.")
|
226 |
+
return
|
227 |
+
|
228 |
+
# 4. Perform the migration
|
229 |
+
logger.info("Starting data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS...")
|
230 |
+
migration_sql = """
|
231 |
+
INSERT INTO LIGHTRAG_VDB_CHUNKS (
|
232 |
+
id, workspace, full_doc_id, chunk_order_index, tokens, content,
|
233 |
+
content_vector, file_path, create_time, update_time
|
234 |
+
)
|
235 |
+
SELECT
|
236 |
+
id, workspace, full_doc_id, chunk_order_index, tokens, content,
|
237 |
+
content_vector, file_path, create_time, update_time
|
238 |
+
FROM LIGHTRAG_DOC_CHUNKS
|
239 |
+
ON CONFLICT (workspace, id) DO NOTHING;
|
240 |
+
"""
|
241 |
+
await self.execute(migration_sql)
|
242 |
+
logger.info("Data migration to LIGHTRAG_VDB_CHUNKS completed successfully.")
|
243 |
+
|
244 |
+
except Exception as e:
|
245 |
+
logger.error(f"Failed during data migration to LIGHTRAG_VDB_CHUNKS: {e}")
|
246 |
+
# Do not re-raise, to allow the application to start
|
247 |
+
|
248 |
async def check_tables(self):
|
249 |
# First create all tables
|
250 |
for k, v in TABLES.items():
|
|
|
296 |
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
|
297 |
# Don't throw an exception, allow the initialization process to continue
|
298 |
|
299 |
+
# Finally, attempt to migrate old doc chunks data if needed
|
300 |
+
try:
|
301 |
+
await self._migrate_doc_chunks_to_vdb_chunks()
|
302 |
+
except Exception as e:
|
303 |
+
logger.error(f"PostgreSQL, Failed to migrate doc_chunks to vdb_chunks: {e}")
|
304 |
+
|
305 |
async def query(
|
306 |
self,
|
307 |
sql: str,
|