RAG-Demonstration / api_interface.py
cmarley314's picture
Update api_interface.py
4986502 verified
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
from tqdm import tqdm
from openai import OpenAI
import string
import pickle
import os
import time
from langchain_community.document_loaders import PyMuPDFLoader
class API_Interface:
def __init__(self, OPEN_AI_KEY, PINECONE_KEY, chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"):
self.chunk_size = chunk_size
self.embed_model = embed_model
self.chat_model = chat_model
#with open("open_ai_key.txt") as infile:
# OPEN_AI_KEY = infile.readline().strip()
#with open("pinecone_key.txt") as infile:
# PINECONE_KEY = infile.readline().strip()
OPEN_AI_KEY = OPEN_AI_KEY
PINECONE_KEY = PINECONE_KEY
self.__client = OpenAI(api_key=OPEN_AI_KEY)
self.__pc = Pinecone(api_key=PINECONE_KEY)
self.__index = self.__pc.Index('eep596mp2')
print("Chunking documents.")
self.chunked_texts, self.chunked_pnums = self.__chunk_document()
self.table_texts, self.table_pnums = self.__chunk_tables()
print("Initializing vector store.")
self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY)
print("Initializing table store.")
self.tablespace, self.tablestore = self.__init_tablestore(OPEN_AI_KEY)
def __chunk_document(self) -> tuple[list[str], list[int]]:
loader = PyMuPDFLoader(file_path = "Solar Eclipse Information.pdf", mode = "page")
docs = loader.load()
page_texts = [page.page_content for page in docs] # Extract page_content
page_numbers = [page.metadata["page"] for page in docs] # Extract metadata["page"]
splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=500)
chunked_texts, chunk_page_numbers = [], []
previous_page_tail = ""
for text, pnum in zip(page_texts, page_numbers):
chunks = splitter.split_text(previous_page_tail + " " + text)
chunked_texts.extend(chunks[:-1])
chunk_page_numbers.extend([pnum]*(len(chunks)-1))
previous_page_tail = chunks[-1]
chunked_texts.append(chunks[-1])
chunk_page_numbers.append(pnum)
return chunked_texts, chunk_page_numbers
def __chunk_tables(self):
tabler = PyMuPDFLoader(file_path = "Solar Eclipse Table.pdf", mode = "page")
tables = tabler.load()
HEADER = "Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration"
table_texts = []
# print(tables[0].page_content)
c = tables[0].page_content
i = c.find("km")
# print(c[i+3:])
for page in tables:
c = page.page_content
i = c.find("km")
values = c[i+3:].split("\n")
text = ""
idv = 0
dates = [None, None]
partial_flag = False
for val in values:
if idv == 2:
year = val
if dates[0] is None:
dates[0] = year
else:
dates[1] = year
if idv % 16 == 4:
val = val.replace(" ", " ")
text += val + " "
idv += 1
if val.startswith("P"):
partial_flag = True
if val.endswith("W") or val.endswith("E"):
if partial_flag:
idv = -2
else:
idv = -4
if idv == 0:
text += "\n"
partial_flag = False
table_texts.append(f"Solar eclipses between {dates[0]} and {dates[1]}:\n\n" + HEADER + "\n" + text)
table_numbers = [page.metadata["page"] for page in tables]
return table_texts, table_numbers
def __init_vectorstore(self, OPEN_AI_KEY):
NAMESPACE = f"ns_eclipse_{self.chunk_size}"
_ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE)
if _ns is not None and _ns.get('vector_count') in (None, 0):
self.__index.delete(delete_all=True, namespace=NAMESPACE)
_ns = None
if _ns is None:
print("... generating embeddings.")
self.__generate_embeddings()
records = []
for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, self.embeddings)):
records.append({
"id": f"chunk{i}",
"values": embedding,
"metadata": {
"text": text,
"page_number": pnum
}
})
print(len(records))
batch_size = 180
print("... upsertting records.")
for b in tqdm(range((len(records)-1)//batch_size+1)):
self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE)
# print(b+1, "/", (len(records)-1)//batch_size+1)
while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None:
time.sleep(1)
print("Index stats:", self.__index.describe_index_stats())
openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model)
vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs)
return NAMESPACE, vectorstore
def __init_tablestore(self, OPEN_AI_KEY):
NAMESPACE = f"ts_eclipse"
_ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE)
if _ns is not None and _ns.get('vector_count') in (None, 0):
self.__index.delete(delete_all=True, namespace=NAMESPACE)
_ns = None
if _ns is None:
print("... generating table embeddings.")
self.__generate_table_embeddings()
records = []
for i, (text, pnum, embedding) in enumerate(zip(self.table_texts, self.table_pnums, self.tmbeddings)):
records.append({
"id": f"chunk{i}",
"values": embedding,
"metadata": {
"text": text,
"page_number": pnum
}
})
print(len(records))
batch_size = 180
print("... upsertting records.")
for b in tqdm(range((len(records)-1)//batch_size+1)):
self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE)
# print(b+1, "/", (len(records)-1)//batch_size+1)
while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None:
time.sleep(1)
print("Index stats:", self.__index.describe_index_stats())
openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model)
vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs)
return NAMESPACE, vectorstore
def __generate_embeddings(self) -> None:
""" Generates self.embeddings """
EMBED_PATH = f"eclipse_text_embeddings_{self.chunk_size}.pkl"
def get_embedding(text):
text = text.replace("\n", " ")
# text = text.replace(string.punctuation, "")
response = self.__client.embeddings.create(input = [text], model=self.embed_model)
return response.data[0].embedding
if not os.path.exists(EMBED_PATH):
self.embeddings = []
for text in tqdm(self.chunked_texts):
self.embeddings.append(get_embedding(text))
with open(EMBED_PATH, "wb") as outfile:
pickle.dump(self.embeddings, outfile)
else:
print("--- found existing embeddings file. Shortcutting.")
with open(EMBED_PATH, "rb") as infile:
self.embeddings:list[list[float]] = pickle.load(infile)
def __generate_table_embeddings(self) -> None:
""" Generates self.tmbeddings """
TABLE_PATH = f"eclipse_table_embeddings.pkl"
HEADER = """Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration"""
def get_embedding(text):
text = text.replace("\n", " ")
response = self.__client.embeddings.create(input = [text], model=self.embed_model)
return response.data[0].embedding
if not os.path.exists(TABLE_PATH):
self.tmbeddings = []
for table in tqdm(self.table_texts):
self.tmbeddings.append(get_embedding(table))
with open(TABLE_PATH, "wb") as outfile:
pickle.dump(self.tmbeddings, outfile)
else:
print("--- found existing embeddings file. Shortcutting.")
with open(TABLE_PATH, "rb") as infile:
self.tmbeddings:list[list[float]] = pickle.load(infile)
def query_pinecone_vector_store(self, query:str, top_k_docs:int = 5, top_k_tbls:int = 5,
namespace:str = None, tablespace:str = None):
namespace = namespace or self.namespace
tablespace = tablespace or self.tablespace
assert namespace in self.__index.describe_index_stats().get('namespaces')
assert tablespace in self.__index.describe_index_stats().get('namespaces')
response = self.vectorstore.similarity_search_with_relevance_scores(query=query,
k=top_k_docs,
namespace=namespace)
tesponse = self.vectorstore.similarity_search_with_relevance_scores(query=query,
k=top_k_tbls,
namespace=tablespace)
return [tuple(zip(*response)), tuple(zip(*tesponse))]
def client_chat(self, messages, model=None):
model = model or self.chat_model
response = self.__client.chat.completions.create(messages=messages, model=model)
return response.choices[0].message.content
if __name__ == "__main__":
tester = API_Interface()
my_query = "What is the backpropogation algorithm?"
response = tester.query_pinecone_vector_store(my_query)
for doc in response:
print(doc.metadata["page_number"], doc.page_content, "\n\n")