Fix time handling bugs for PostgreSQL
Browse files- lightrag/kg/postgres_impl.py +49 -34
lightrag/kg/postgres_impl.py
CHANGED
@@ -1,8 +1,8 @@
|
|
1 |
import asyncio
|
2 |
import json
|
3 |
import os
|
4 |
-
import time
|
5 |
import datetime
|
|
|
6 |
from dataclasses import dataclass, field
|
7 |
from typing import Any, Union, final
|
8 |
import numpy as np
|
@@ -545,7 +545,9 @@ class PGVectorStorage(BaseVectorStorage):
|
|
545 |
await ClientManager.release_client(self.db)
|
546 |
self.db = None
|
547 |
|
548 |
-
def _upsert_chunks(
|
|
|
|
|
549 |
try:
|
550 |
upsert_sql = SQL_TEMPLATES["upsert_chunk"]
|
551 |
data: dict[str, Any] = {
|
@@ -557,6 +559,8 @@ class PGVectorStorage(BaseVectorStorage):
|
|
557 |
"content": item["content"],
|
558 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
559 |
"file_path": item["file_path"],
|
|
|
|
|
560 |
}
|
561 |
except Exception as e:
|
562 |
logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}")
|
@@ -564,7 +568,9 @@ class PGVectorStorage(BaseVectorStorage):
|
|
564 |
|
565 |
return upsert_sql, data
|
566 |
|
567 |
-
def _upsert_entities(
|
|
|
|
|
568 |
upsert_sql = SQL_TEMPLATES["upsert_entity"]
|
569 |
source_id = item["source_id"]
|
570 |
if isinstance(source_id, str) and "<SEP>" in source_id:
|
@@ -580,10 +586,14 @@ class PGVectorStorage(BaseVectorStorage):
|
|
580 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
581 |
"chunk_ids": chunk_ids,
|
582 |
"file_path": item.get("file_path", None),
|
|
|
|
|
583 |
}
|
584 |
return upsert_sql, data
|
585 |
|
586 |
-
def _upsert_relationships(
|
|
|
|
|
587 |
upsert_sql = SQL_TEMPLATES["upsert_relationship"]
|
588 |
source_id = item["source_id"]
|
589 |
if isinstance(source_id, str) and "<SEP>" in source_id:
|
@@ -600,6 +610,8 @@ class PGVectorStorage(BaseVectorStorage):
|
|
600 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
601 |
"chunk_ids": chunk_ids,
|
602 |
"file_path": item.get("file_path", None),
|
|
|
|
|
603 |
}
|
604 |
return upsert_sql, data
|
605 |
|
@@ -608,11 +620,13 @@ class PGVectorStorage(BaseVectorStorage):
|
|
608 |
if not data:
|
609 |
return
|
610 |
|
611 |
-
|
|
|
|
|
|
|
612 |
list_data = [
|
613 |
{
|
614 |
"__id__": k,
|
615 |
-
"__created_at__": current_time,
|
616 |
**{k1: v1 for k1, v1 in v.items()},
|
617 |
}
|
618 |
for k, v in data.items()
|
@@ -631,11 +645,11 @@ class PGVectorStorage(BaseVectorStorage):
|
|
631 |
d["__vector__"] = embeddings[i]
|
632 |
for item in list_data:
|
633 |
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
|
634 |
-
upsert_sql, data = self._upsert_chunks(item)
|
635 |
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
|
636 |
-
upsert_sql, data = self._upsert_entities(item)
|
637 |
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
|
638 |
-
upsert_sql, data = self._upsert_relationships(item)
|
639 |
else:
|
640 |
raise ValueError(f"{self.namespace} is not supported")
|
641 |
|
@@ -776,7 +790,7 @@ class PGVectorStorage(BaseVectorStorage):
|
|
776 |
logger.error(f"Unknown namespace for ID lookup: {self.namespace}")
|
777 |
return None
|
778 |
|
779 |
-
query = f"SELECT
|
780 |
params = {"workspace": self.db.workspace, "id": id}
|
781 |
|
782 |
try:
|
@@ -806,7 +820,7 @@ class PGVectorStorage(BaseVectorStorage):
|
|
806 |
return []
|
807 |
|
808 |
ids_str = ",".join([f"'{id}'" for id in ids])
|
809 |
-
query = f"SELECT
|
810 |
params = {"workspace": self.db.workspace}
|
811 |
|
812 |
try:
|
@@ -2222,8 +2236,8 @@ TABLES = {
|
|
2222 |
doc_name VARCHAR(1024),
|
2223 |
content TEXT,
|
2224 |
meta JSONB,
|
2225 |
-
create_time TIMESTAMP
|
2226 |
-
update_time TIMESTAMP
|
2227 |
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
|
2228 |
)"""
|
2229 |
},
|
@@ -2237,8 +2251,8 @@ TABLES = {
|
|
2237 |
content TEXT,
|
2238 |
content_vector VECTOR,
|
2239 |
file_path VARCHAR(256),
|
2240 |
-
create_time TIMESTAMP
|
2241 |
-
update_time TIMESTAMP,
|
2242 |
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
|
2243 |
)"""
|
2244 |
},
|
@@ -2249,8 +2263,8 @@ TABLES = {
|
|
2249 |
entity_name VARCHAR(255),
|
2250 |
content TEXT,
|
2251 |
content_vector VECTOR,
|
2252 |
-
create_time TIMESTAMP
|
2253 |
-
update_time TIMESTAMP,
|
2254 |
chunk_ids VARCHAR(255)[] NULL,
|
2255 |
file_path TEXT NULL,
|
2256 |
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
|
@@ -2264,8 +2278,8 @@ TABLES = {
|
|
2264 |
target_id VARCHAR(256),
|
2265 |
content TEXT,
|
2266 |
content_vector VECTOR,
|
2267 |
-
create_time TIMESTAMP
|
2268 |
-
update_time TIMESTAMP,
|
2269 |
chunk_ids VARCHAR(255)[] NULL,
|
2270 |
file_path TEXT NULL,
|
2271 |
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
|
@@ -2341,8 +2355,9 @@ SQL_TEMPLATES = {
|
|
2341 |
update_time = CURRENT_TIMESTAMP
|
2342 |
""",
|
2343 |
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
|
2344 |
-
chunk_order_index, full_doc_id, content, content_vector, file_path
|
2345 |
-
|
|
|
2346 |
ON CONFLICT (workspace,id) DO UPDATE
|
2347 |
SET tokens=EXCLUDED.tokens,
|
2348 |
chunk_order_index=EXCLUDED.chunk_order_index,
|
@@ -2350,23 +2365,23 @@ SQL_TEMPLATES = {
|
|
2350 |
content = EXCLUDED.content,
|
2351 |
content_vector=EXCLUDED.content_vector,
|
2352 |
file_path=EXCLUDED.file_path,
|
2353 |
-
update_time =
|
2354 |
""",
|
2355 |
# SQL for VectorStorage
|
2356 |
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
|
2357 |
-
content_vector, chunk_ids, file_path)
|
2358 |
-
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7)
|
2359 |
ON CONFLICT (workspace,id) DO UPDATE
|
2360 |
SET entity_name=EXCLUDED.entity_name,
|
2361 |
content=EXCLUDED.content,
|
2362 |
content_vector=EXCLUDED.content_vector,
|
2363 |
chunk_ids=EXCLUDED.chunk_ids,
|
2364 |
file_path=EXCLUDED.file_path,
|
2365 |
-
update_time=
|
2366 |
""",
|
2367 |
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
|
2368 |
-
target_id, content, content_vector, chunk_ids, file_path)
|
2369 |
-
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8)
|
2370 |
ON CONFLICT (workspace,id) DO UPDATE
|
2371 |
SET source_id=EXCLUDED.source_id,
|
2372 |
target_id=EXCLUDED.target_id,
|
@@ -2374,7 +2389,7 @@ SQL_TEMPLATES = {
|
|
2374 |
content_vector=EXCLUDED.content_vector,
|
2375 |
chunk_ids=EXCLUDED.chunk_ids,
|
2376 |
file_path=EXCLUDED.file_path,
|
2377 |
-
update_time =
|
2378 |
""",
|
2379 |
"relationships": """
|
2380 |
WITH relevant_chunks AS (
|
@@ -2382,9 +2397,9 @@ SQL_TEMPLATES = {
|
|
2382 |
FROM LIGHTRAG_DOC_CHUNKS
|
2383 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2384 |
)
|
2385 |
-
SELECT source_id as src_id, target_id as tgt_id
|
2386 |
FROM (
|
2387 |
-
SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
|
2388 |
FROM LIGHTRAG_VDB_RELATION r
|
2389 |
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
|
2390 |
WHERE r.workspace=$1
|
@@ -2399,9 +2414,9 @@ SQL_TEMPLATES = {
|
|
2399 |
FROM LIGHTRAG_DOC_CHUNKS
|
2400 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2401 |
)
|
2402 |
-
SELECT entity_name FROM
|
2403 |
(
|
2404 |
-
SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
|
2405 |
FROM LIGHTRAG_VDB_ENTITY e
|
2406 |
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
|
2407 |
WHERE e.workspace=$1
|
@@ -2416,9 +2431,9 @@ SQL_TEMPLATES = {
|
|
2416 |
FROM LIGHTRAG_DOC_CHUNKS
|
2417 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2418 |
)
|
2419 |
-
SELECT id, content, file_path FROM
|
2420 |
(
|
2421 |
-
SELECT id, content, file_path, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
|
2422 |
FROM LIGHTRAG_DOC_CHUNKS
|
2423 |
WHERE workspace=$1
|
2424 |
AND id IN (SELECT chunk_id FROM relevant_chunks)
|
|
|
1 |
import asyncio
|
2 |
import json
|
3 |
import os
|
|
|
4 |
import datetime
|
5 |
+
from datetime import timezone
|
6 |
from dataclasses import dataclass, field
|
7 |
from typing import Any, Union, final
|
8 |
import numpy as np
|
|
|
545 |
await ClientManager.release_client(self.db)
|
546 |
self.db = None
|
547 |
|
548 |
+
def _upsert_chunks(
|
549 |
+
self, item: dict[str, Any], current_time: datetime.datetime
|
550 |
+
) -> tuple[str, dict[str, Any]]:
|
551 |
try:
|
552 |
upsert_sql = SQL_TEMPLATES["upsert_chunk"]
|
553 |
data: dict[str, Any] = {
|
|
|
559 |
"content": item["content"],
|
560 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
561 |
"file_path": item["file_path"],
|
562 |
+
"create_time": current_time,
|
563 |
+
"update_time": current_time,
|
564 |
}
|
565 |
except Exception as e:
|
566 |
logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}")
|
|
|
568 |
|
569 |
return upsert_sql, data
|
570 |
|
571 |
+
def _upsert_entities(
|
572 |
+
self, item: dict[str, Any], current_time: datetime.datetime
|
573 |
+
) -> tuple[str, dict[str, Any]]:
|
574 |
upsert_sql = SQL_TEMPLATES["upsert_entity"]
|
575 |
source_id = item["source_id"]
|
576 |
if isinstance(source_id, str) and "<SEP>" in source_id:
|
|
|
586 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
587 |
"chunk_ids": chunk_ids,
|
588 |
"file_path": item.get("file_path", None),
|
589 |
+
"create_time": current_time,
|
590 |
+
"update_time": current_time,
|
591 |
}
|
592 |
return upsert_sql, data
|
593 |
|
594 |
+
def _upsert_relationships(
|
595 |
+
self, item: dict[str, Any], current_time: datetime.datetime
|
596 |
+
) -> tuple[str, dict[str, Any]]:
|
597 |
upsert_sql = SQL_TEMPLATES["upsert_relationship"]
|
598 |
source_id = item["source_id"]
|
599 |
if isinstance(source_id, str) and "<SEP>" in source_id:
|
|
|
610 |
"content_vector": json.dumps(item["__vector__"].tolist()),
|
611 |
"chunk_ids": chunk_ids,
|
612 |
"file_path": item.get("file_path", None),
|
613 |
+
"create_time": current_time,
|
614 |
+
"update_time": current_time,
|
615 |
}
|
616 |
return upsert_sql, data
|
617 |
|
|
|
620 |
if not data:
|
621 |
return
|
622 |
|
623 |
+
# Get current time with UTC timezone
|
624 |
+
current_time_with_tz = datetime.datetime.now(timezone.utc)
|
625 |
+
# Remove timezone info to avoid timezone mismatch issues
|
626 |
+
current_time = current_time_with_tz.replace(tzinfo=None)
|
627 |
list_data = [
|
628 |
{
|
629 |
"__id__": k,
|
|
|
630 |
**{k1: v1 for k1, v1 in v.items()},
|
631 |
}
|
632 |
for k, v in data.items()
|
|
|
645 |
d["__vector__"] = embeddings[i]
|
646 |
for item in list_data:
|
647 |
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
|
648 |
+
upsert_sql, data = self._upsert_chunks(item, current_time)
|
649 |
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
|
650 |
+
upsert_sql, data = self._upsert_entities(item, current_time)
|
651 |
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
|
652 |
+
upsert_sql, data = self._upsert_relationships(item, current_time)
|
653 |
else:
|
654 |
raise ValueError(f"{self.namespace} is not supported")
|
655 |
|
|
|
790 |
logger.error(f"Unknown namespace for ID lookup: {self.namespace}")
|
791 |
return None
|
792 |
|
793 |
+
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2"
|
794 |
params = {"workspace": self.db.workspace, "id": id}
|
795 |
|
796 |
try:
|
|
|
820 |
return []
|
821 |
|
822 |
ids_str = ",".join([f"'{id}'" for id in ids])
|
823 |
+
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
|
824 |
params = {"workspace": self.db.workspace}
|
825 |
|
826 |
try:
|
|
|
2236 |
doc_name VARCHAR(1024),
|
2237 |
content TEXT,
|
2238 |
meta JSONB,
|
2239 |
+
create_time TIMESTAMP(0)
|
2240 |
+
update_time TIMESTAMP(0)
|
2241 |
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
|
2242 |
)"""
|
2243 |
},
|
|
|
2251 |
content TEXT,
|
2252 |
content_vector VECTOR,
|
2253 |
file_path VARCHAR(256),
|
2254 |
+
create_time TIMESTAMP(0),
|
2255 |
+
update_time TIMESTAMP(0),
|
2256 |
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
|
2257 |
)"""
|
2258 |
},
|
|
|
2263 |
entity_name VARCHAR(255),
|
2264 |
content TEXT,
|
2265 |
content_vector VECTOR,
|
2266 |
+
create_time TIMESTAMP(0),
|
2267 |
+
update_time TIMESTAMP(0),
|
2268 |
chunk_ids VARCHAR(255)[] NULL,
|
2269 |
file_path TEXT NULL,
|
2270 |
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
|
|
|
2278 |
target_id VARCHAR(256),
|
2279 |
content TEXT,
|
2280 |
content_vector VECTOR,
|
2281 |
+
create_time TIMESTAMP(0),
|
2282 |
+
update_time TIMESTAMP(0),
|
2283 |
chunk_ids VARCHAR(255)[] NULL,
|
2284 |
file_path TEXT NULL,
|
2285 |
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
|
|
|
2355 |
update_time = CURRENT_TIMESTAMP
|
2356 |
""",
|
2357 |
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
|
2358 |
+
chunk_order_index, full_doc_id, content, content_vector, file_path,
|
2359 |
+
create_time, update_time)
|
2360 |
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
2361 |
ON CONFLICT (workspace,id) DO UPDATE
|
2362 |
SET tokens=EXCLUDED.tokens,
|
2363 |
chunk_order_index=EXCLUDED.chunk_order_index,
|
|
|
2365 |
content = EXCLUDED.content,
|
2366 |
content_vector=EXCLUDED.content_vector,
|
2367 |
file_path=EXCLUDED.file_path,
|
2368 |
+
update_time = EXCLUDED.update_time
|
2369 |
""",
|
2370 |
# SQL for VectorStorage
|
2371 |
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
|
2372 |
+
content_vector, chunk_ids, file_path, create_time, update_time)
|
2373 |
+
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
|
2374 |
ON CONFLICT (workspace,id) DO UPDATE
|
2375 |
SET entity_name=EXCLUDED.entity_name,
|
2376 |
content=EXCLUDED.content,
|
2377 |
content_vector=EXCLUDED.content_vector,
|
2378 |
chunk_ids=EXCLUDED.chunk_ids,
|
2379 |
file_path=EXCLUDED.file_path,
|
2380 |
+
update_time=EXCLUDED.update_time
|
2381 |
""",
|
2382 |
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
|
2383 |
+
target_id, content, content_vector, chunk_ids, file_path, create_time, update_time)
|
2384 |
+
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10)
|
2385 |
ON CONFLICT (workspace,id) DO UPDATE
|
2386 |
SET source_id=EXCLUDED.source_id,
|
2387 |
target_id=EXCLUDED.target_id,
|
|
|
2389 |
content_vector=EXCLUDED.content_vector,
|
2390 |
chunk_ids=EXCLUDED.chunk_ids,
|
2391 |
file_path=EXCLUDED.file_path,
|
2392 |
+
update_time = EXCLUDED.update_time
|
2393 |
""",
|
2394 |
"relationships": """
|
2395 |
WITH relevant_chunks AS (
|
|
|
2397 |
FROM LIGHTRAG_DOC_CHUNKS
|
2398 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2399 |
)
|
2400 |
+
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
|
2401 |
FROM (
|
2402 |
+
SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
|
2403 |
FROM LIGHTRAG_VDB_RELATION r
|
2404 |
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
|
2405 |
WHERE r.workspace=$1
|
|
|
2414 |
FROM LIGHTRAG_DOC_CHUNKS
|
2415 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2416 |
)
|
2417 |
+
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
2418 |
(
|
2419 |
+
SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
|
2420 |
FROM LIGHTRAG_VDB_ENTITY e
|
2421 |
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
|
2422 |
WHERE e.workspace=$1
|
|
|
2431 |
FROM LIGHTRAG_DOC_CHUNKS
|
2432 |
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
2433 |
)
|
2434 |
+
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
2435 |
(
|
2436 |
+
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
|
2437 |
FROM LIGHTRAG_DOC_CHUNKS
|
2438 |
WHERE workspace=$1
|
2439 |
AND id IN (SELECT chunk_id FROM relevant_chunks)
|