Spaces:
No application file
No application file
from langchain.embeddings.huggingface import HuggingFaceEmbeddings | |
from vectorstores import MyFAISS | |
from langchain.document_loaders import UnstructuredFileLoader, TextLoader, CSVLoader | |
from configs.model_config import * | |
import datetime | |
from textsplitter import ChineseTextSplitter | |
from typing import List | |
from utils import torch_gc | |
from tqdm import tqdm | |
from pypinyin import lazy_pinyin | |
from loader import UnstructuredPaddleImageLoader, UnstructuredPaddlePDFLoader | |
from models.base import (BaseAnswer, | |
AnswerResult) | |
from models.loader.args import parser | |
from models.loader import LoaderCheckPoint | |
import models.shared as shared | |
from agent import bing_search | |
from langchain.docstore.document import Document | |
from functools import lru_cache | |
# patch HuggingFaceEmbeddings to make it hashable | |
def _embeddings_hash(self): | |
return hash(self.model_name) | |
HuggingFaceEmbeddings.__hash__ = _embeddings_hash | |
# will keep CACHED_VS_NUM of vector store caches | |
def load_vector_store(vs_path, embeddings): | |
return MyFAISS.load_local(vs_path, embeddings) | |
def tree(filepath, ignore_dir_names=None, ignore_file_names=None): | |
"""返回两个列表,第一个列表为 filepath 下全部文件的完整路径, 第二个为对应的文件名""" | |
if ignore_dir_names is None: | |
ignore_dir_names = [] | |
if ignore_file_names is None: | |
ignore_file_names = [] | |
ret_list = [] | |
if isinstance(filepath, str): | |
if not os.path.exists(filepath): | |
print("路径不存在") | |
return None, None | |
elif os.path.isfile(filepath) and os.path.basename(filepath) not in ignore_file_names: | |
return [filepath], [os.path.basename(filepath)] | |
elif os.path.isdir(filepath) and os.path.basename(filepath) not in ignore_dir_names: | |
for file in os.listdir(filepath): | |
fullfilepath = os.path.join(filepath, file) | |
if os.path.isfile(fullfilepath) and os.path.basename(fullfilepath) not in ignore_file_names: | |
ret_list.append(fullfilepath) | |
if os.path.isdir(fullfilepath) and os.path.basename(fullfilepath) not in ignore_dir_names: | |
ret_list.extend(tree(fullfilepath, ignore_dir_names, ignore_file_names)[0]) | |
return ret_list, [os.path.basename(p) for p in ret_list] | |
def load_file(filepath, sentence_size=SENTENCE_SIZE): | |
if filepath.lower().endswith(".md"): | |
loader = UnstructuredFileLoader(filepath, mode="elements") | |
docs = loader.load() | |
elif filepath.lower().endswith(".txt"): | |
loader = TextLoader(filepath, autodetect_encoding=True) | |
textsplitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) | |
docs = loader.load_and_split(textsplitter) | |
elif filepath.lower().endswith(".pdf"): | |
loader = UnstructuredPaddlePDFLoader(filepath) | |
textsplitter = ChineseTextSplitter(pdf=True, sentence_size=sentence_size) | |
docs = loader.load_and_split(textsplitter) | |
elif filepath.lower().endswith(".jpg") or filepath.lower().endswith(".png"): | |
loader = UnstructuredPaddleImageLoader(filepath, mode="elements") | |
textsplitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) | |
docs = loader.load_and_split(text_splitter=textsplitter) | |
elif filepath.lower().endswith(".csv"): | |
loader = CSVLoader(filepath) | |
docs = loader.load() | |
else: | |
loader = UnstructuredFileLoader(filepath, mode="elements") | |
textsplitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) | |
docs = loader.load_and_split(text_splitter=textsplitter) | |
write_check_file(filepath, docs) | |
return docs | |
def write_check_file(filepath, docs): | |
folder_path = os.path.join(os.path.dirname(filepath), "tmp_files") | |
if not os.path.exists(folder_path): | |
os.makedirs(folder_path) | |
fp = os.path.join(folder_path, 'load_file.txt') | |
with open(fp, 'a+', encoding='utf-8') as fout: | |
fout.write("filepath=%s,len=%s" % (filepath, len(docs))) | |
fout.write('\n') | |
for i in docs: | |
fout.write(str(i)) | |
fout.write('\n') | |
fout.close() | |
def generate_prompt(related_docs: List[str], | |
query: str, | |
prompt_template: str = PROMPT_TEMPLATE, ) -> str: | |
context = "\n".join([doc.page_content for doc in related_docs]) | |
prompt = prompt_template.replace("{question}", query).replace("{context}", context) | |
return prompt | |
def search_result2docs(search_results): | |
docs = [] | |
for result in search_results: | |
doc = Document(page_content=result["snippet"] if "snippet" in result.keys() else "", | |
metadata={"source": result["link"] if "link" in result.keys() else "", | |
"filename": result["title"] if "title" in result.keys() else ""}) | |
docs.append(doc) | |
return docs | |
class LocalDocQA: | |
llm: BaseAnswer = None | |
embeddings: object = None | |
top_k: int = VECTOR_SEARCH_TOP_K | |
chunk_size: int = CHUNK_SIZE | |
chunk_conent: bool = True | |
score_threshold: int = VECTOR_SEARCH_SCORE_THRESHOLD | |
def init_cfg(self, | |
embedding_model: str = EMBEDDING_MODEL, | |
embedding_device=EMBEDDING_DEVICE, | |
llm_model: BaseAnswer = None, | |
top_k=VECTOR_SEARCH_TOP_K, | |
): | |
self.llm = llm_model | |
self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model_dict[embedding_model], | |
model_kwargs={'device': embedding_device}) | |
self.top_k = top_k | |
def init_knowledge_vector_store(self, | |
filepath: str or List[str], | |
vs_path: str or os.PathLike = None, | |
sentence_size=SENTENCE_SIZE): | |
loaded_files = [] | |
failed_files = [] | |
if isinstance(filepath, str): | |
if not os.path.exists(filepath): | |
print("路径不存在") | |
return None | |
elif os.path.isfile(filepath): | |
file = os.path.split(filepath)[-1] | |
try: | |
docs = load_file(filepath, sentence_size) | |
logger.info(f"{file} 已成功加载") | |
loaded_files.append(filepath) | |
except Exception as e: | |
logger.error(e) | |
logger.info(f"{file} 未能成功加载") | |
return None | |
elif os.path.isdir(filepath): | |
docs = [] | |
for fullfilepath, file in tqdm(zip(*tree(filepath, ignore_dir_names=['tmp_files'])), desc="加载文件"): | |
try: | |
docs += load_file(fullfilepath, sentence_size) | |
loaded_files.append(fullfilepath) | |
except Exception as e: | |
logger.error(e) | |
failed_files.append(file) | |
if len(failed_files) > 0: | |
logger.info("以下文件未能成功加载:") | |
for file in failed_files: | |
logger.info(f"{file}\n") | |
else: | |
docs = [] | |
for file in filepath: | |
try: | |
docs += load_file(file) | |
logger.info(f"{file} 已成功加载") | |
loaded_files.append(file) | |
except Exception as e: | |
logger.error(e) | |
logger.info(f"{file} 未能成功加载") | |
if len(docs) > 0: | |
logger.info("文件加载完毕,正在生成向量库") | |
if vs_path and os.path.isdir(vs_path) and "index.faiss" in os.listdir(vs_path): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
vector_store.add_documents(docs) | |
torch_gc() | |
else: | |
if not vs_path: | |
vs_path = os.path.join(KB_ROOT_PATH, | |
f"""{"".join(lazy_pinyin(os.path.splitext(file)[0]))}_FAISS_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}""", | |
"vector_store") | |
vector_store = MyFAISS.from_documents(docs, self.embeddings) # docs 为Document列表 | |
torch_gc() | |
vector_store.save_local(vs_path) | |
return vs_path, loaded_files | |
else: | |
logger.info("文件均未成功加载,请检查依赖包或替换为其他文件再次上传。") | |
return None, loaded_files | |
def one_knowledge_add(self, vs_path, one_title, one_conent, one_content_segmentation, sentence_size): | |
try: | |
if not vs_path or not one_title or not one_conent: | |
logger.info("知识库添加错误,请确认知识库名字、标题、内容是否正确!") | |
return None, [one_title] | |
docs = [Document(page_content=one_conent + "\n", metadata={"source": one_title})] | |
if not one_content_segmentation: | |
text_splitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) | |
docs = text_splitter.split_documents(docs) | |
if os.path.isdir(vs_path) and os.path.isfile(vs_path + "/index.faiss"): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
vector_store.add_documents(docs) | |
else: | |
vector_store = MyFAISS.from_documents(docs, self.embeddings) ##docs 为Document列表 | |
torch_gc() | |
vector_store.save_local(vs_path) | |
return vs_path, [one_title] | |
except Exception as e: | |
logger.error(e) | |
return None, [one_title] | |
def get_knowledge_based_answer(self, query, vs_path, chat_history=[], streaming: bool = STREAMING): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
vector_store.chunk_size = self.chunk_size | |
vector_store.chunk_conent = self.chunk_conent | |
vector_store.score_threshold = self.score_threshold | |
related_docs_with_score = vector_store.similarity_search_with_score(query, k=self.top_k) | |
torch_gc() | |
if len(related_docs_with_score) > 0: | |
prompt = generate_prompt(related_docs_with_score, query) | |
else: | |
prompt = query | |
for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history, | |
streaming=streaming): | |
resp = answer_result.llm_output["answer"] | |
history = answer_result.history | |
history[-1][0] = query | |
response = {"query": query, | |
"result": resp, | |
"source_documents": related_docs_with_score} | |
yield response, history | |
# query 查询内容 | |
# vs_path 知识库路径 | |
# chunk_conent 是否启用上下文关联 | |
# score_threshold 搜索匹配score阈值 | |
# vector_search_top_k 搜索知识库内容条数,默认搜索5条结果 | |
# chunk_sizes 匹配单段内容的连接上下文长度 | |
def get_knowledge_based_conent_test(self, query, vs_path, chunk_conent, | |
score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD, | |
vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_size=CHUNK_SIZE): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
# FAISS.similarity_search_with_score_by_vector = similarity_search_with_score_by_vector | |
vector_store.chunk_conent = chunk_conent | |
vector_store.score_threshold = score_threshold | |
vector_store.chunk_size = chunk_size | |
related_docs_with_score = vector_store.similarity_search_with_score(query, k=vector_search_top_k) | |
if not related_docs_with_score: | |
response = {"query": query, | |
"source_documents": []} | |
return response, "" | |
torch_gc() | |
prompt = "\n".join([doc.page_content for doc in related_docs_with_score]) | |
response = {"query": query, | |
"source_documents": related_docs_with_score} | |
return response, prompt | |
def get_search_result_based_answer(self, query, chat_history=[], streaming: bool = STREAMING): | |
results = bing_search(query) | |
result_docs = search_result2docs(results) | |
prompt = generate_prompt(result_docs, query) | |
for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history, | |
streaming=streaming): | |
resp = answer_result.llm_output["answer"] | |
history = answer_result.history | |
history[-1][0] = query | |
response = {"query": query, | |
"result": resp, | |
"source_documents": result_docs} | |
yield response, history | |
def delete_file_from_vector_store(self, | |
filepath: str or List[str], | |
vs_path): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
status = vector_store.delete_doc(filepath) | |
return status | |
def update_file_from_vector_store(self, | |
filepath: str or List[str], | |
vs_path, | |
docs: List[Document],): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
status = vector_store.update_doc(filepath, docs) | |
return status | |
def list_file_from_vector_store(self, | |
vs_path, | |
fullpath=False): | |
vector_store = load_vector_store(vs_path, self.embeddings) | |
docs = vector_store.list_docs() | |
if fullpath: | |
return docs | |
else: | |
return [os.path.split(doc)[-1] for doc in docs] | |
if __name__ == "__main__": | |
# 初始化消息 | |
args = None | |
args = parser.parse_args(args=['--model-dir', '/media/checkpoint/', '--model', 'chatglm-6b', '--no-remote-model']) | |
args_dict = vars(args) | |
shared.loaderCheckPoint = LoaderCheckPoint(args_dict) | |
llm_model_ins = shared.loaderLLM() | |
llm_model_ins.set_history_len(LLM_HISTORY_LEN) | |
local_doc_qa = LocalDocQA() | |
local_doc_qa.init_cfg(llm_model=llm_model_ins) | |
query = "本项目使用的embedding模型是什么,消耗多少显存" | |
vs_path = "/media/gpt4-pdf-chatbot-langchain/dev-langchain-ChatGLM/vector_store/test" | |
last_print_len = 0 | |
# for resp, history in local_doc_qa.get_knowledge_based_answer(query=query, | |
# vs_path=vs_path, | |
# chat_history=[], | |
# streaming=True): | |
for resp, history in local_doc_qa.get_search_result_based_answer(query=query, | |
chat_history=[], | |
streaming=True): | |
print(resp["result"][last_print_len:], end="", flush=True) | |
last_print_len = len(resp["result"]) | |
source_text = [f"""出处 [{inum + 1}] {doc.metadata['source'] if doc.metadata['source'].startswith("http") | |
else os.path.split(doc.metadata['source'])[-1]}:\n\n{doc.page_content}\n\n""" | |
# f"""相关度:{doc.metadata['score']}\n\n""" | |
for inum, doc in | |
enumerate(resp["source_documents"])] | |
logger.info("\n\n" + "\n\n".join(source_text)) | |
pass | |