Spaces:
Sleeping
Sleeping
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import Pinecone | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_pinecone import PineconeVectorStore | |
| from pinecone import Pinecone | |
| from tqdm import tqdm | |
| from openai import OpenAI | |
| import string | |
| import pickle | |
| import os | |
| import time | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| class API_Interface: | |
| def __init__(self, OPEN_AI_KEY, PINECONE_KEY, chunk_size:int = 1500, embed_model:str = "text-embedding-3-small", chat_model:str = "gpt-3.5-turbo"): | |
| self.chunk_size = chunk_size | |
| self.embed_model = embed_model | |
| self.chat_model = chat_model | |
| #with open("open_ai_key.txt") as infile: | |
| # OPEN_AI_KEY = infile.readline().strip() | |
| #with open("pinecone_key.txt") as infile: | |
| # PINECONE_KEY = infile.readline().strip() | |
| OPEN_AI_KEY = OPEN_AI_KEY | |
| PINECONE_KEY = PINECONE_KEY | |
| self.__client = OpenAI(api_key=OPEN_AI_KEY) | |
| self.__pc = Pinecone(api_key=PINECONE_KEY) | |
| self.__index = self.__pc.Index('eep596mp2') | |
| print("Chunking documents.") | |
| self.chunked_texts, self.chunked_pnums = self.__chunk_document() | |
| self.table_texts, self.table_pnums = self.__chunk_tables() | |
| print("Initializing vector store.") | |
| self.namespace, self.vectorstore = self.__init_vectorstore(OPEN_AI_KEY) | |
| print("Initializing table store.") | |
| self.tablespace, self.tablestore = self.__init_tablestore(OPEN_AI_KEY) | |
| def __chunk_document(self) -> tuple[list[str], list[int]]: | |
| loader = PyMuPDFLoader(file_path = "Solar Eclipse Information.pdf", mode = "page") | |
| docs = loader.load() | |
| page_texts = [page.page_content for page in docs] # Extract page_content | |
| page_numbers = [page.metadata["page"] for page in docs] # Extract metadata["page"] | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=500) | |
| chunked_texts, chunk_page_numbers = [], [] | |
| previous_page_tail = "" | |
| for text, pnum in zip(page_texts, page_numbers): | |
| chunks = splitter.split_text(previous_page_tail + " " + text) | |
| chunked_texts.extend(chunks[:-1]) | |
| chunk_page_numbers.extend([pnum]*(len(chunks)-1)) | |
| previous_page_tail = chunks[-1] | |
| chunked_texts.append(chunks[-1]) | |
| chunk_page_numbers.append(pnum) | |
| return chunked_texts, chunk_page_numbers | |
| def __chunk_tables(self): | |
| tabler = PyMuPDFLoader(file_path = "Solar Eclipse Table.pdf", mode = "page") | |
| tables = tabler.load() | |
| HEADER = "Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration" | |
| table_texts = [] | |
| # print(tables[0].page_content) | |
| c = tables[0].page_content | |
| i = c.find("km") | |
| # print(c[i+3:]) | |
| for page in tables: | |
| c = page.page_content | |
| i = c.find("km") | |
| values = c[i+3:].split("\n") | |
| text = "" | |
| idv = 0 | |
| dates = [None, None] | |
| partial_flag = False | |
| for val in values: | |
| if idv == 2: | |
| year = val | |
| if dates[0] is None: | |
| dates[0] = year | |
| else: | |
| dates[1] = year | |
| if idv % 16 == 4: | |
| val = val.replace(" ", " ") | |
| text += val + " " | |
| idv += 1 | |
| if val.startswith("P"): | |
| partial_flag = True | |
| if val.endswith("W") or val.endswith("E"): | |
| if partial_flag: | |
| idv = -2 | |
| else: | |
| idv = -4 | |
| if idv == 0: | |
| text += "\n" | |
| partial_flag = False | |
| table_texts.append(f"Solar eclipses between {dates[0]} and {dates[1]}:\n\n" + HEADER + "\n" + text) | |
| table_numbers = [page.metadata["page"] for page in tables] | |
| return table_texts, table_numbers | |
| def __init_vectorstore(self, OPEN_AI_KEY): | |
| NAMESPACE = f"ns_eclipse_{self.chunk_size}" | |
| _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) | |
| if _ns is not None and _ns.get('vector_count') in (None, 0): | |
| self.__index.delete(delete_all=True, namespace=NAMESPACE) | |
| _ns = None | |
| if _ns is None: | |
| print("... generating embeddings.") | |
| self.__generate_embeddings() | |
| records = [] | |
| for i, (text, pnum, embedding) in enumerate(zip(self.chunked_texts, self.chunked_pnums, self.embeddings)): | |
| records.append({ | |
| "id": f"chunk{i}", | |
| "values": embedding, | |
| "metadata": { | |
| "text": text, | |
| "page_number": pnum | |
| } | |
| }) | |
| print(len(records)) | |
| batch_size = 180 | |
| print("... upsertting records.") | |
| for b in tqdm(range((len(records)-1)//batch_size+1)): | |
| self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) | |
| # print(b+1, "/", (len(records)-1)//batch_size+1) | |
| while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: | |
| time.sleep(1) | |
| print("Index stats:", self.__index.describe_index_stats()) | |
| openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) | |
| vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) | |
| return NAMESPACE, vectorstore | |
| def __init_tablestore(self, OPEN_AI_KEY): | |
| NAMESPACE = f"ts_eclipse" | |
| _ns = self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) | |
| if _ns is not None and _ns.get('vector_count') in (None, 0): | |
| self.__index.delete(delete_all=True, namespace=NAMESPACE) | |
| _ns = None | |
| if _ns is None: | |
| print("... generating table embeddings.") | |
| self.__generate_table_embeddings() | |
| records = [] | |
| for i, (text, pnum, embedding) in enumerate(zip(self.table_texts, self.table_pnums, self.tmbeddings)): | |
| records.append({ | |
| "id": f"chunk{i}", | |
| "values": embedding, | |
| "metadata": { | |
| "text": text, | |
| "page_number": pnum | |
| } | |
| }) | |
| print(len(records)) | |
| batch_size = 180 | |
| print("... upsertting records.") | |
| for b in tqdm(range((len(records)-1)//batch_size+1)): | |
| self.__index.upsert(records[b*batch_size:(b+1)*batch_size], namespace=NAMESPACE) | |
| # print(b+1, "/", (len(records)-1)//batch_size+1) | |
| while self.__index.describe_index_stats()['namespaces'].get(NAMESPACE) is None: | |
| time.sleep(1) | |
| print("Index stats:", self.__index.describe_index_stats()) | |
| openaiembs = OpenAIEmbeddings(api_key=OPEN_AI_KEY, model=self.embed_model) | |
| vectorstore = PineconeVectorStore(self.__index, embedding=openaiembs) | |
| return NAMESPACE, vectorstore | |
| def __generate_embeddings(self) -> None: | |
| """ Generates self.embeddings """ | |
| EMBED_PATH = f"eclipse_text_embeddings_{self.chunk_size}.pkl" | |
| def get_embedding(text): | |
| text = text.replace("\n", " ") | |
| # text = text.replace(string.punctuation, "") | |
| response = self.__client.embeddings.create(input = [text], model=self.embed_model) | |
| return response.data[0].embedding | |
| if not os.path.exists(EMBED_PATH): | |
| self.embeddings = [] | |
| for text in tqdm(self.chunked_texts): | |
| self.embeddings.append(get_embedding(text)) | |
| with open(EMBED_PATH, "wb") as outfile: | |
| pickle.dump(self.embeddings, outfile) | |
| else: | |
| print("--- found existing embeddings file. Shortcutting.") | |
| with open(EMBED_PATH, "rb") as infile: | |
| self.embeddings:list[list[float]] = pickle.load(infile) | |
| def __generate_table_embeddings(self) -> None: | |
| """ Generates self.tmbeddings """ | |
| TABLE_PATH = f"eclipse_table_embeddings.pkl" | |
| HEADER = """Catalog Number, Canon Plate, Calendar Date, Terrestrial Dynamical Time of Greatest Eclipse, UT - TD (s), Luna Number, Saros Number, Eclipse Type, QLE, Gamma, Eclipse Magnitude, Latitude, Longitude, Sun Altitude, Sun Azimuth, Path Width (km), Central Line Duration""" | |
| def get_embedding(text): | |
| text = text.replace("\n", " ") | |
| response = self.__client.embeddings.create(input = [text], model=self.embed_model) | |
| return response.data[0].embedding | |
| if not os.path.exists(TABLE_PATH): | |
| self.tmbeddings = [] | |
| for table in tqdm(self.table_texts): | |
| self.tmbeddings.append(get_embedding(table)) | |
| with open(TABLE_PATH, "wb") as outfile: | |
| pickle.dump(self.tmbeddings, outfile) | |
| else: | |
| print("--- found existing embeddings file. Shortcutting.") | |
| with open(TABLE_PATH, "rb") as infile: | |
| self.tmbeddings:list[list[float]] = pickle.load(infile) | |
| def query_pinecone_vector_store(self, query:str, top_k_docs:int = 5, top_k_tbls:int = 5, | |
| namespace:str = None, tablespace:str = None): | |
| namespace = namespace or self.namespace | |
| tablespace = tablespace or self.tablespace | |
| assert namespace in self.__index.describe_index_stats().get('namespaces') | |
| assert tablespace in self.__index.describe_index_stats().get('namespaces') | |
| response = self.vectorstore.similarity_search_with_relevance_scores(query=query, | |
| k=top_k_docs, | |
| namespace=namespace) | |
| tesponse = self.vectorstore.similarity_search_with_relevance_scores(query=query, | |
| k=top_k_tbls, | |
| namespace=tablespace) | |
| return [tuple(zip(*response)), tuple(zip(*tesponse))] | |
| def client_chat(self, messages, model=None): | |
| model = model or self.chat_model | |
| response = self.__client.chat.completions.create(messages=messages, model=model) | |
| return response.choices[0].message.content | |
| if __name__ == "__main__": | |
| tester = API_Interface() | |
| my_query = "What is the backpropogation algorithm?" | |
| response = tester.query_pinecone_vector_store(my_query) | |
| for doc in response: | |
| print(doc.metadata["page_number"], doc.page_content, "\n\n") |