Spaces:
Running
Running
from loguru import logger | |
import json | |
from bin_public.utils.utils_db import * | |
from bin_public.config.presets import MIGRAINE_PROMPT | |
import PyPDF2 | |
import pinecone | |
from langchain.vectorstores import Pinecone | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
PINECONE_API_KEY = os.environ['PINECONE_API_KEY'] | |
PINECONE_API_ENV = os.environ['PINECONE_API_ENV'] | |
def load_local_file_PDF(path, file_name): | |
result = {} | |
temp = '' | |
pdf_reader = PyPDF2.PdfReader(open(path, 'rb')) | |
for i in range(len(pdf_reader.pages)): | |
pages = pdf_reader.pages[i] | |
temp += pages.extract_text() | |
if file_name.endswith('.pdf'): | |
index = file_name[:-4] | |
temp = temp.replace('\n', '').replace('\t', '') | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
texts = text_splitter.split_text(temp) | |
i = 0 | |
for content in texts: | |
result[f'{index}_{i}'] = content | |
i += 1 | |
return result | |
def holo_query_insert_file_contents(file_name, file_content): | |
run_sql = f""" | |
insert into s_context( | |
file_name, | |
content | |
) | |
select | |
'{file_name}' as file_name, | |
'{file_content}' as content | |
""" | |
holo_query_func(run_sql, is_query=0) | |
def holo_query_get_content(run_sql): | |
temp = [] | |
data = holo_query_func(run_sql, is_query=1) | |
for i in data: | |
temp.append(i[1].replace('\n', '').replace('\t', '')) | |
return temp | |
def pdf2database(path, file_name): | |
temp = '' | |
pdf_reader = PyPDF2.PdfReader(open(path, 'rb')) | |
for i in range(len(pdf_reader.pages)): | |
pages = pdf_reader.pages[i] | |
temp += pages.extract_text() | |
if file_name.endswith('.pdf'): | |
index = file_name[:-4] | |
temp = temp.replace('\n', '').replace('\t', '') | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
texts = text_splitter.split_text(temp) | |
for i in range(len(texts)): | |
holo_query_insert_file_contents(f'{index}_{i}', f'{texts[i]}') | |
logger.info(f'{index}_{i} stored') | |
def load_json(path): | |
with open(path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return data | |
def get_content_from_json(path): | |
result = [] | |
data = load_json(path) | |
for item in data: | |
key = list(item.keys())[0] | |
value = item[key] | |
result.append(key + ',' + value) | |
return result | |
def data2embeddings(index_name, data, embeddings): | |
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_API_ENV) | |
Pinecone.from_texts([t for t in data], embeddings, index_name=index_name) | |
logger.info("Stored Successfully") | |
def context_construction(api_key, query, model, pinecone_api_key, pinecone_api_env, temperature, index_name, mode="map_reduce"): | |
temp = [] | |
embeddings = OpenAIEmbeddings(openai_api_key=api_key) | |
# llm = OpenAI(temperature=temperature, openai_api_key=api_key, model_name=model) | |
pinecone.init(api_key=pinecone_api_key, environment=pinecone_api_env) | |
docsearch = Pinecone.from_existing_index(index_name=index_name, embedding=embeddings) | |
# chain = load_qa_chain(llm, chain_type=mode) | |
if not any(char.isalnum() for char in query): | |
return MIGRAINE_PROMPT, "Connecting to Pinecone" | |
else: | |
docs = docsearch.similarity_search(query, include_metadata=True, k=2) | |
# response = chain.run(input_documents=docs, question=str(query)) | |
for i in docs: | |
temp.append(i.page_content) | |
return '用以下资料进行辅助回答\n' + ' '.join(temp), '\n' + ' '.join(temp), "Connecting to Pinecone" |