from langchain.docstore.document import Document from configs import EMBEDDING_MODEL, logger from server.model_workers.base import ApiEmbeddingsParams from server.utils import BaseResponse, get_model_worker_config, list_embed_models, list_online_embed_models from fastapi import Body from fastapi.concurrency import run_in_threadpool from typing import Dict, List online_embed_models = list_online_embed_models() def embed_texts( texts: List[str], embed_model: str = EMBEDDING_MODEL, to_query: bool = False, ) -> BaseResponse: ''' 对文本进行向量化。返回数据格式:BaseResponse(data=List[List[float]]) ''' try: if embed_model in list_embed_models(): # 使用本地Embeddings模型 from server.utils import load_local_embeddings embeddings = load_local_embeddings(model=embed_model) return BaseResponse(data=embeddings.embed_documents(texts)) if embed_model in list_online_embed_models(): # 使用在线API config = get_model_worker_config(embed_model) worker_class = config.get("worker_class") embed_model = config.get("embed_model") worker = worker_class() if worker_class.can_embedding(): params = ApiEmbeddingsParams(texts=texts, to_query=to_query, embed_model=embed_model) resp = worker.do_embeddings(params) return BaseResponse(**resp) return BaseResponse(code=500, msg=f"指定的模型 {embed_model} 不支持 Embeddings 功能。") except Exception as e: logger.error(e) return BaseResponse(code=500, msg=f"文本向量化过程中出现错误:{e}") async def aembed_texts( texts: List[str], embed_model: str = EMBEDDING_MODEL, to_query: bool = False, ) -> BaseResponse: ''' 对文本进行向量化。返回数据格式:BaseResponse(data=List[List[float]]) ''' try: if embed_model in list_embed_models(): # 使用本地Embeddings模型 from server.utils import load_local_embeddings embeddings = load_local_embeddings(model=embed_model) return BaseResponse(data=await embeddings.aembed_documents(texts)) if embed_model in list_online_embed_models(): # 使用在线API return await run_in_threadpool(embed_texts, texts=texts, embed_model=embed_model, to_query=to_query) except Exception as e: logger.error(e) return BaseResponse(code=500, msg=f"文本向量化过程中出现错误:{e}") def embed_texts_endpoint( texts: List[str] = Body(..., description="要嵌入的文本列表", examples=[["hello", "world"]]), embed_model: str = Body(EMBEDDING_MODEL, description=f"使用的嵌入模型,除了本地部署的Embedding模型,也支持在线API({online_embed_models})提供的嵌入服务。"), to_query: bool = Body(False, description="向量是否用于查询。有些模型如Minimax对存储/查询的向量进行了区分优化。"), ) -> BaseResponse: ''' 对文本进行向量化,返回 BaseResponse(data=List[List[float]]) ''' return embed_texts(texts=texts, embed_model=embed_model, to_query=to_query) def embed_documents( docs: List[Document], embed_model: str = EMBEDDING_MODEL, to_query: bool = False, ) -> Dict: """ 将 List[Document] 向量化,转化为 VectorStore.add_embeddings 可以接受的参数 """ texts = [x.page_content for x in docs] metadatas = [x.metadata for x in docs] embeddings = embed_texts(texts=texts, embed_model=embed_model, to_query=to_query).data if embeddings is not None: return { "texts": texts, "embeddings": embeddings, "metadatas": metadatas, }