import os import re import logging import shutil import string import pinecone import chromadb import json, jsonlines from tqdm import tqdm from langchain_community.vectorstores import Pinecone from langchain_community.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_community.embeddings import VoyageEmbeddings from langchain_community.document_loaders import PyPDFLoader from langchain_core.documents import Document as lancghain_Document from ragatouille import RAGPretrainedModel from dotenv import load_dotenv,find_dotenv load_dotenv(find_dotenv(),override=True) # Set secrets from environment file OPENAI_API_KEY=os.getenv('OPENAI_API_KEY') VOYAGE_API_KEY=os.getenv('VOYAGE_API_KEY') PINECONE_API_KEY=os.getenv('PINECONE_API_KEY') HUGGINGFACEHUB_API_TOKEN=os.getenv('HUGGINGFACEHUB_API_TOKEN') def chunk_docs(docs, chunk_method='tiktoken_recursive', file=None, chunk_size=500, chunk_overlap=0, use_json=False): docs_out=[] if file: logging.info('Jsonl file to be used: '+file) if use_json and os.path.exists(file): logging.info('Jsonl file found, using this instead of parsing docs.') with open(file, "r") as file_in: file_data = [json.loads(line) for line in file_in] # Process the file data and put it into the same format as docs_out for line in file_data: doc_temp = lancghain_Document(page_content=line['page_content'], source=line['metadata']['source'], page=line['metadata']['page'], metadata=line['metadata']) if has_meaningful_content(doc_temp): docs_out.append(doc_temp) logging.info('Parsed: '+file) logging.info('Number of entries: '+str(len(docs_out))) logging.info('Sample entries:') logging.info(str(docs_out[0])) logging.info(str(docs_out[-1])) else: logging.info('No jsonl found. Reading and parsing docs.') logging.info('Chunk size (tokens): '+str(chunk_size)) logging.info('Chunk overlap (tokens): '+str(chunk_overlap)) for doc in tqdm(docs,desc='Reading and parsing docs'): logging.info('Parsing: '+doc) loader = PyPDFLoader(doc) data = loader.load_and_split() if chunk_method=='tiktoken_recursive': text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=chunk_size, chunk_overlap=chunk_overlap) else: raise NotImplementedError pages = text_splitter.split_documents(data) # Tidy up text by removing unnecessary characters for page in pages: page.metadata['source']=os.path.basename(page.metadata['source']) # Strip path page.metadata['page']=int(page.metadata['page'])+1 # Pages are 0 based, update page.page_content=re.sub(r"(\w+)-\n(\w+)", r"\1\2", page.page_content) # Merge hyphenated words page.page_content = re.sub(r"(?500: raise ValueError("RAGatouille cannot handle chunks larger than 500 tokens. Reduce token count.") vectorstore.index( collection=docs_out_colbert, index_name=index_name, max_document_length=chunk_size, overwrite_index=True, split_documents=True, ) logging.info(f"Index created: {vectorstore}") # Move the directory to the db folder logging.info(f"Moving RAGatouille index to {db_path}") ragatouille_path = os.path.join(db_path, '.ragatouille') if os.path.exists(ragatouille_path): shutil.rmtree(ragatouille_path) logging.info(f"RAGatouille index deleted from {ragatouille_path}") shutil.move('./.ragatouille', db_path) logging.info(f"RAGatouille index created in {db_path}:"+str(vectorstore)) # Return vectorstore or docs if index_name: return vectorstore else: return docs_out def delete_index(index_type,index_name): """ Deletes an existing Pinecone index with the given index_name. """ if index_type=="Pinecone": # Import and initialize Pinecone client pinecone.init( api_key=PINECONE_API_KEY ) try: pinecone.describe_index(index_name) logging.info(f"Index {index_name} exists.") except: raise Exception(f"Index {index_name} does not exist, cannot delete.") else: pinecone.delete_index(index_name) logging.info(f"Index {index_name} deleted.") elif index_type=="ChromaDB": # Delete existing collection logging.info(f"Deleting index {index_name}.") persistent_client = chromadb.PersistentClient(path='../db/chromadb') persistent_client.delete_collection(name=index_name) logging.info("Index deleted.") elif index_type=="RAGatouille": raise NotImplementedError def batch_upsert(index_type,vectorstore,docs_out,batch_size=50): # Batch insert the chunks into the vector store for i in range(0, len(docs_out), batch_size): chunk_batch = docs_out[i:i + batch_size] if index_type=="Pinecone": vectorstore.add_documents(chunk_batch) elif index_type=="ChromaDB": vectorstore.add_documents(chunk_batch) # Happens to be same for chroma/pinecone, leaving if statement just in case return vectorstore def has_meaningful_content(page): """ Test whether the page has more than 30% words and is more than 5 words. """ text=page.page_content num_words = len(text.split()) alphanumeric_pct = sum(c.isalnum() for c in text) / len(text) if num_words < 5 or alphanumeric_pct < 0.3: return False else: return True def embedding_size(embedding_model): """ Returns the embedding size of the model. """ if isinstance(embedding_model,OpenAIEmbeddings): return 1536 # https://platform.openai.com/docs/models/embeddings, test-embedding-ada-002 elif isinstance(embedding_model,VoyageEmbeddings): return 1024 # https://docs.voyageai.com/embeddings/, voyage-02 else: raise NotImplementedError