Zulelee's picture
Upload 254 files
5e9cd1d verified
import json
from typing import List, Dict, Optional
from langchain.schema import Document
from langchain.vectorstores.pgvector import PGVector, DistanceStrategy
from sqlalchemy import text
from configs import kbs_config
from server.knowledge_base.kb_service.base import SupportedVSType, KBService, EmbeddingsFunAdapter, \
score_threshold_process
from server.knowledge_base.utils import KnowledgeFile
import shutil
import sqlalchemy
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import Session
class PGKBService(KBService):
engine: Engine = sqlalchemy.create_engine(kbs_config.get("pg").get("connection_uri"), pool_size=10)
def _load_pg_vector(self):
self.pg_vector = PGVector(embedding_function=EmbeddingsFunAdapter(self.embed_model),
collection_name=self.kb_name,
distance_strategy=DistanceStrategy.EUCLIDEAN,
connection=PGKBService.engine,
connection_string=kbs_config.get("pg").get("connection_uri"))
def get_doc_by_ids(self, ids: List[str]) -> List[Document]:
with Session(PGKBService.engine) as session:
stmt = text("SELECT document, cmetadata FROM langchain_pg_embedding WHERE collection_id in :ids")
results = [Document(page_content=row[0], metadata=row[1]) for row in
session.execute(stmt, {'ids': ids}).fetchall()]
return results
def del_doc_by_ids(self, ids: List[str]) -> bool:
return super().del_doc_by_ids(ids)
def do_init(self):
self._load_pg_vector()
def do_create_kb(self):
pass
def vs_type(self) -> str:
return SupportedVSType.PG
def do_drop_kb(self):
with Session(PGKBService.engine) as session:
session.execute(text(f'''
-- 删除 langchain_pg_embedding 表中关联到 langchain_pg_collection 表中 的记录
DELETE FROM langchain_pg_embedding
WHERE collection_id IN (
SELECT uuid FROM langchain_pg_collection WHERE name = '{self.kb_name}'
);
-- 删除 langchain_pg_collection 表中 记录
DELETE FROM langchain_pg_collection WHERE name = '{self.kb_name}';
'''))
session.commit()
shutil.rmtree(self.kb_path)
def do_search(self, query: str, top_k: int, score_threshold: float):
embed_func = EmbeddingsFunAdapter(self.embed_model)
embeddings = embed_func.embed_query(query)
docs = self.pg_vector.similarity_search_with_score_by_vector(embeddings, top_k)
return score_threshold_process(score_threshold, top_k, docs)
def do_add_doc(self, docs: List[Document], **kwargs) -> List[Dict]:
ids = self.pg_vector.add_documents(docs)
doc_infos = [{"id": id, "metadata": doc.metadata} for id, doc in zip(ids, docs)]
return doc_infos
def do_delete_doc(self, kb_file: KnowledgeFile, **kwargs):
with Session(PGKBService.engine) as session:
filepath = kb_file.filepath.replace('\\', '\\\\')
session.execute(
text(
''' DELETE FROM langchain_pg_embedding WHERE cmetadata::jsonb @> '{"source": "filepath"}'::jsonb;'''.replace(
"filepath", filepath)))
session.commit()
def do_clear_vs(self):
self.pg_vector.delete_collection()
self.pg_vector.create_collection()
if __name__ == '__main__':
from server.db.base import Base, engine
# Base.metadata.create_all(bind=engine)
pGKBService = PGKBService("test")
# pGKBService.create_kb()
# pGKBService.add_doc(KnowledgeFile("README.md", "test"))
# pGKBService.delete_doc(KnowledgeFile("README.md", "test"))
# pGKBService.drop_kb()
print(pGKBService.get_doc_by_ids(["f1e51390-3029-4a19-90dc-7118aaa25772"]))
# print(pGKBService.search_docs("如何启动api服务"))