AllenYkl's picture
Update bin_public/utils/Pinecone.py
6235949
raw
history blame
3.77 kB
from loguru import logger
import json
from bin_public.utils.utils_db import *
from bin_public.config.presets import MIGRAINE_PROMPT
import PyPDF2
import pinecone
from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
PINECONE_API_KEY = os.environ['PINECONE_API_KEY']
PINECONE_API_ENV = os.environ['PINECONE_API_ENV']
def load_local_file_PDF(path, file_name):
result = {}
temp = ''
pdf_reader = PyPDF2.PdfReader(open(path, 'rb'))
for i in range(len(pdf_reader.pages)):
pages = pdf_reader.pages[i]
temp += pages.extract_text()
if file_name.endswith('.pdf'):
index = file_name[:-4]
temp = temp.replace('\n', '').replace('\t', '')
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_text(temp)
i = 0
for content in texts:
result[f'{index}_{i}'] = content
i += 1
return result
def holo_query_insert_file_contents(file_name, file_content):
run_sql = f"""
insert into s_context(
file_name,
content
)
select
'{file_name}' as file_name,
'{file_content}' as content
"""
holo_query_func(run_sql, is_query=0)
def holo_query_get_content(run_sql):
temp = []
data = holo_query_func(run_sql, is_query=1)
for i in data:
temp.append(i[1].replace('\n', '').replace('\t', ''))
return temp
def pdf2database(path, file_name):
temp = ''
pdf_reader = PyPDF2.PdfReader(open(path, 'rb'))
for i in range(len(pdf_reader.pages)):
pages = pdf_reader.pages[i]
temp += pages.extract_text()
if file_name.endswith('.pdf'):
index = file_name[:-4]
temp = temp.replace('\n', '').replace('\t', '')
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_text(temp)
for i in range(len(texts)):
holo_query_insert_file_contents(f'{index}_{i}', f'{texts[i]}')
logger.info(f'{index}_{i} stored')
def load_json(path):
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def get_content_from_json(path):
result = []
data = load_json(path)
for item in data:
key = list(item.keys())[0]
value = item[key]
result.append(key + ',' + value)
return result
def data2embeddings(index_name, data, embeddings):
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_API_ENV)
Pinecone.from_texts([t for t in data], embeddings, index_name=index_name)
logger.info("Stored Successfully")
def context_construction(api_key, query, model, pinecone_api_key, pinecone_api_env, temperature, index_name, mode="map_reduce"):
temp = []
embeddings = OpenAIEmbeddings(openai_api_key=api_key)
# llm = OpenAI(temperature=temperature, openai_api_key=api_key, model_name=model)
pinecone.init(api_key=pinecone_api_key, environment=pinecone_api_env)
docsearch = Pinecone.from_existing_index(index_name=index_name, embedding=embeddings)
# chain = load_qa_chain(llm, chain_type=mode)
if not any(char.isalnum() for char in query):
return MIGRAINE_PROMPT, "Connecting to Pinecone"
else:
docs = docsearch.similarity_search(query, include_metadata=True, k=2)
# response = chain.run(input_documents=docs, question=str(query))
for i in docs:
temp.append(i.page_content)
return '用以下资料进行辅助回答\n' + ' '.join(temp), '\n' + ' '.join(temp), "Connecting to Pinecone"