boardpac_chat_app_test / dataPipeline.py
Boardpac/theekshanas
upload files again
39de480
"""
Python Backend API to chat with private data
08/15/2023
D.M. Theekshana Samaradiwakara
"""
import os
import time
import glob
from multiprocessing import Pool
from tqdm import tqdm
from dotenv import load_dotenv
from chromaDb import save_files
from langchain.document_loaders import (
CSVLoader,
EverNoteLoader,
PyMuPDFLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
)
from langchain.document_loaders import DirectoryLoader
text_loader_kwargs={'autodetect_encoding': True}
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
from chroma import load_store
load_dotenv()
chunk_size = os.environ.get('EMBEDDING_CHUNK_SIZE')
chunk_overlap = os.environ.get('EMBEDDING_CHUNK_OVERLAP')
embeddings_model_name = os.environ.get("EMBEDDINGS_MODEL_NAME")
# Map file extensions to document loaders and their arguments
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
# ".docx": (Docx2txtLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".eml": (UnstructuredEmailLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".md": (UnstructuredMarkdownLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PyMuPDFLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
# Add more mappings for other file extensions and loaders as needed
}
class DataPipeline:
def __init__(self):
self.dataset_name = None
self.vectorstore = None
def load_documents_in_folder(self, folder):
print(f"loading documents...")
loader = DirectoryLoader(folder, glob="**/[!.]*", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)
pages = loader.load()
return pages
def load_single_document(self, doc):
ext = "." + doc.name.rsplit(".", 1)[-1]
if ext in LOADER_MAPPING:
loader_class, loader_args = LOADER_MAPPING[ext]
loader = loader_class(doc, **loader_args)
return loader.load()
raise ValueError(f"Unsupported file extension '{ext}'")
def load_documents(self, uploaded_files):
with Pool(processes=os.cpu_count()) as pool:
results = []
with tqdm(total=len(uploaded_files), desc='Loading new documents', ncols=80) as pbar:
for i, docs in enumerate(pool.imap_unordered(self.load_single_document, uploaded_files)):
results.extend(docs)
pbar.update()
return results
def load_streamlit_documents(self, uploaded_files, year):
documents = []
for uploaded_file in uploaded_files:
print(print("\n\n uploaded_file \n\n",uploaded_file,"\n"))
source = uploaded_file.name
print(print("\n\n source \n\n",source,"\n"))
content = uploaded_file.read().decode('latin-1')
print(print("\n\n content \n\n",content[:10],"\n"))
doc = Document(
page_content=content,
metadata={
"source": source,
'year': year
}
)
print(print("doc"))
print(print("\n doc \n\n",doc,"\n\n\n\n"))
documents.append(doc)
return documents
def process_documents(self, documents):
print(f"Creating embeddings. May take some minutes...")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "(?<=\. )", " ", ""]
)
texts = text_splitter.split_documents(documents)
return texts
def persist_documents(self, persist_directory, document_splits):
save_files(persist_directory, document_splits)
def add_metadata(self, documents, metadata, value):
for doc in documents:
doc.metadata[metadata]=value
return documents