from typing import List, Dict, Optional from langchain.schema import Document from langchain.vectorstores.milvus import Milvus from configs import kbs_config from server.knowledge_base.kb_service.base import KBService, SupportedVSType, EmbeddingsFunAdapter, \ score_threshold_process from server.knowledge_base.utils import KnowledgeFile class MilvusKBService(KBService): milvus: Milvus @staticmethod def get_collection(milvus_name): from pymilvus import Collection return Collection(milvus_name) def get_doc_by_ids(self, ids: List[str]) -> List[Document]: result = [] if self.milvus.col: # ids = [int(id) for id in ids] # for milvus if needed #pr 2725 data_list = self.milvus.col.query(expr=f'pk in {ids}', output_fields=["*"]) for data in data_list: text = data.pop("text") result.append(Document(page_content=text, metadata=data)) return result def del_doc_by_ids(self, ids: List[str]) -> bool: self.milvus.col.delete(expr=f'pk in {ids}') @staticmethod def search(milvus_name, content, limit=3): search_params = { "metric_type": "L2", "params": {"nprobe": 10}, } c = MilvusKBService.get_collection(milvus_name) return c.search(content, "embeddings", search_params, limit=limit, output_fields=["content"]) def do_create_kb(self): pass def vs_type(self) -> str: return SupportedVSType.MILVUS def _load_milvus(self): self.milvus = Milvus(embedding_function=EmbeddingsFunAdapter(self.embed_model), collection_name=self.kb_name, connection_args=kbs_config.get("milvus"), index_params=kbs_config.get("milvus_kwargs")["index_params"], search_params=kbs_config.get("milvus_kwargs")["search_params"] ) def do_init(self): self._load_milvus() def do_drop_kb(self): if self.milvus.col: self.milvus.col.release() self.milvus.col.drop() def do_search(self, query: str, top_k: int, score_threshold: float): self._load_milvus() embed_func = EmbeddingsFunAdapter(self.embed_model) embeddings = embed_func.embed_query(query) docs = self.milvus.similarity_search_with_score_by_vector(embeddings, top_k) return score_threshold_process(score_threshold, top_k, docs) def do_add_doc(self, docs: List[Document], **kwargs) -> List[Dict]: for doc in docs: for k, v in doc.metadata.items(): doc.metadata[k] = str(v) for field in self.milvus.fields: doc.metadata.setdefault(field, "") doc.metadata.pop(self.milvus._text_field, None) doc.metadata.pop(self.milvus._vector_field, None) ids = self.milvus.add_documents(docs) doc_infos = [{"id": id, "metadata": doc.metadata} for id, doc in zip(ids, docs)] return doc_infos def do_delete_doc(self, kb_file: KnowledgeFile, **kwargs): if self.milvus.col: filepath = kb_file.filepath.replace('\\', '\\\\') delete_list = [item.get("pk") for item in self.milvus.col.query(expr=f'source == "{filepath}"', output_fields=["pk"])] self.milvus.col.delete(expr=f'pk in {delete_list}') def do_clear_vs(self): if self.milvus.col: self.do_drop_kb() self.do_init() if __name__ == '__main__': # 测试建表使用 from server.db.base import Base, engine Base.metadata.create_all(bind=engine) milvusService = MilvusKBService("test") # milvusService.add_doc(KnowledgeFile("README.md", "test")) print(milvusService.get_doc_by_ids(["444022434274215486"])) # milvusService.delete_doc(KnowledgeFile("README.md", "test")) # milvusService.do_drop_kb() # print(milvusService.search_docs("如何启动api服务"))