Spaces:
Runtime error
Runtime error
# Import required libraries | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import ( | |
UnstructuredWordDocumentLoader, | |
PyMuPDFLoader, | |
UnstructuredFileLoader, | |
) | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.chat_models import ChatOpenAI | |
from langchain.vectorstores import Pinecone | |
from langchain.chains.qa_with_sources import load_qa_with_sources_chain | |
from langchain.retrievers.self_query.base import SelfQueryRetriever | |
from langchain.chains.query_constructor.base import AttributeInfo | |
import os | |
import langchain | |
import pinecone | |
import streamlit as st | |
import shutil | |
import json | |
import re | |
OPENAI_API_KEY = '' | |
PINECONE_API_KEY = '' | |
PINECONE_API_ENV = '' | |
langchain.verbose = False | |
def init(): | |
pinecone_index_name = '' | |
pinecone_namespace = '' | |
docsearch_ready = False | |
directory_name = 'tmp_docs' | |
return pinecone_index_name, pinecone_namespace, docsearch_ready, directory_name | |
def save_file(files): | |
# Remove existing files in the directory | |
if os.path.exists(directory_name): | |
for filename in os.listdir(directory_name): | |
file_path = os.path.join(directory_name, filename) | |
try: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
except Exception as e: | |
print(f"Error: {e}") | |
# Save the new file with original filename | |
if files is not None: | |
for file in files: | |
file_name = file.name | |
file_path = os.path.join(directory_name, file_name) | |
with open(file_path, 'wb') as f: | |
shutil.copyfileobj(file, f) | |
def load_files(): | |
all_texts = [] | |
n_files = 0 | |
n_char = 0 | |
n_texts = 0 | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=400, chunk_overlap=50 | |
) | |
for filename in os.listdir(directory_name): | |
file = os.path.join(directory_name, filename) | |
if os.path.isfile(file): | |
if file.endswith(".docx"): | |
loader = UnstructuredWordDocumentLoader(file) | |
elif file.endswith(".pdf"): | |
loader = PyMuPDFLoader(file) | |
else: # assume a pure text format and attempt to load it | |
loader = UnstructuredFileLoader(file) | |
data = loader.load() | |
metadata = data[0].metadata | |
fn = os.path.basename(metadata['source']) | |
author = os.path.splitext(fn)[0] | |
data[0].metadata['author'] = author | |
texts = text_splitter.split_documents(data) | |
n_files += 1 | |
n_char += len(data[0].page_content) | |
n_texts += len(texts) | |
all_texts.extend(texts) | |
st.write( | |
f"Loaded {n_files} file(s) with {n_char} characters, and split into {n_texts} split-documents." | |
) | |
return all_texts, n_texts | |
def ingest(_all_texts, _embeddings, pinecone_index_name, pinecone_namespace): | |
docsearch = Pinecone.from_documents( | |
_all_texts, _embeddings, index_name=pinecone_index_name, namespace=pinecone_namespace) | |
return docsearch | |
def setup_retriever(docsearch, k, llm): | |
metadata_field_info = [ | |
AttributeInfo( | |
name="author", | |
description="The author of the document/text/piece of context", | |
type="string or list[string]", | |
) | |
] | |
document_content_description = "Views/opions/proposals suggested by the author on one or more discussion points." | |
retriever = SelfQueryRetriever.from_llm( | |
llm, docsearch, document_content_description, metadata_field_info, verbose=True) | |
return retriever | |
def setup_docsearch(pinecone_index_name, pinecone_namespace, embeddings): | |
docsearch = [] | |
n_texts = 0 | |
# Load the pre-created Pinecone index. | |
# The index which has already be stored in pinecone.io as long-term memory | |
if pinecone_index_name in pinecone.list_indexes(): | |
docsearch = Pinecone.from_existing_index( | |
index_name=pinecone_index_name, embedding=embeddings, text_key='text', namespace=pinecone_namespace) | |
index_client = pinecone.Index(pinecone_index_name) | |
# Get the index information | |
index_info = index_client.describe_index_stats() | |
n_texts = index_info['namespaces'][pinecone_namespace]['vector_count'] | |
else: | |
raise ValueError('''Cannot find the specified Pinecone index. | |
Create one in pinecone.io or using, e.g., | |
pinecone.create_index( | |
name=index_name, dimension=1536, metric="cosine", shards=1)''') | |
return docsearch, n_texts | |
def get_response(query, chat_history, CRqa): | |
result = CRqa({"question": query, "chat_history": chat_history}) | |
return result['answer'], result['source_documents'] | |
def setup_em_llm(OPENAI_API_KEY, temperature): | |
# Set up OpenAI embeddings | |
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY) | |
# Use Open AI LLM with gpt-3.5-turbo. | |
# Set the temperature to be 0 if you do not want it to make up things | |
llm = ChatOpenAI(temperature=temperature, model_name="gpt-3.5-turbo", streaming=True, | |
openai_api_key=OPENAI_API_KEY) | |
return embeddings, llm | |
def load_chat_history(CHAT_HISTORY_FILENAME): | |
try: | |
with open(CHAT_HISTORY_FILENAME, 'r') as f: | |
chat_history = json.load(f) | |
except (FileNotFoundError, json.JSONDecodeError): | |
chat_history = [] | |
return chat_history | |
def save_chat_history(chat_history, CHAT_HISTORY_FILENAME): | |
with open(CHAT_HISTORY_FILENAME, 'w') as f: | |
json.dump(chat_history, f) | |
pinecone_index_name, pinecone_namespace, docsearch_ready, directory_name = init() | |
def main(pinecone_index_name, pinecone_namespace, docsearch_ready): | |
docsearch_ready = False | |
chat_history = [] | |
# Get user input of whether to use Pinecone or not | |
col1, col2, col3 = st.columns([1, 1, 1]) | |
# create the radio buttons and text input fields | |
with col1: | |
r_ingest = st.radio( | |
'Ingest file(s)?', ('Yes', 'No')) | |
OPENAI_API_KEY = st.text_input( | |
"OpenAI API key:", type="password") | |
with col2: | |
PINECONE_API_KEY = st.text_input( | |
"Pinecone API key:", type="password") | |
PINECONE_API_ENV = st.text_input( | |
"Pinecone API env:", type="password") | |
pinecone_index_name = st.text_input('Pinecone index:') | |
pinecone.init(api_key=PINECONE_API_KEY, | |
environment=PINECONE_API_ENV) | |
with col3: | |
pinecone_namespace = st.text_input('Pinecone namespace:') | |
temperature = st.slider('Temperature', 0.0, 1.0, 0.1) | |
k_sources = st.slider('# source(s) to print out', 0, 20, 2) | |
if pinecone_index_name: | |
session_name = pinecone_index_name | |
embeddings, llm = setup_em_llm(OPENAI_API_KEY, temperature) | |
if r_ingest.lower() == 'yes': | |
files = st.file_uploader( | |
'Upload Files', accept_multiple_files=True) | |
if files: | |
save_file(files) | |
all_texts, n_texts = load_files() | |
docsearch = ingest(all_texts, embeddings, | |
pinecone_index_name, pinecone_namespace) | |
docsearch_ready = True | |
else: | |
st.write( | |
'No data is to be ingested. Make sure the Pinecone index you provided contains data.') | |
docsearch, n_texts = setup_docsearch(pinecone_index_name, pinecone_namespace, | |
embeddings) | |
docsearch_ready = True | |
if docsearch_ready: | |
# number of sources (split-documents when ingesting files); default is 4 | |
k = min([20, n_texts]) | |
retriever = setup_retriever(docsearch, k, llm) | |
CRqa = load_qa_with_sources_chain(llm, chain_type="stuff") | |
st.title('Chatbot') | |
# Get user input | |
query = st.text_area('Enter your question:', height=10, | |
placeholder='Summarize the context.') | |
if query: | |
# Generate a reply based on the user input and chat history | |
CHAT_HISTORY_FILENAME = f"chat_history/{session_name}_chat_hist.json" | |
chat_history = load_chat_history(CHAT_HISTORY_FILENAME) | |
chat_history = [(user, bot) | |
for user, bot in chat_history] | |
docs = retriever.get_relevant_documents(query) | |
if not docs: | |
docs = docsearch.similarity_search(query) | |
result = CRqa.run(input_documents=docs, question=query) | |
reply = re.match(r'(.+?)\.\s*SOURCES:', result).group(1) | |
source = re.search(r'SOURCES:\s*(.+)', result).group(1) | |
# Update the chat history with the user input and system response | |
chat_history.append(('User', query)) | |
chat_history.append(('Bot', reply)) | |
save_chat_history(chat_history, CHAT_HISTORY_FILENAME) | |
latest_chats = chat_history[-4:] | |
chat_history_str = '\n'.join( | |
[f'{x[0]}: {x[1]}' for x in latest_chats]) | |
st.text_area('Chat record:', value=chat_history_str, height=250) | |
if __name__ == '__main__': | |
main(pinecone_index_name, pinecone_namespace, | |
docsearch_ready) | |