Spaces:
Runtime error
Runtime error
"""Refer to | |
https://huggingface.co/spaces/mikeee/docs-chat/blob/main/app.py | |
and https://github.com/PromtEngineer/localGPT/blob/main/ingest.py | |
https://python.langchain.com/en/latest/getting_started/tutorials.html | |
""" | |
# pylint: disable=broad-exception-caught, unused-import | |
import os | |
import time | |
from pathlib import Path | |
# import click | |
# from typing import List | |
import gradio as gr | |
from charset_normalizer import detect | |
from langchain.docstore.document import Document | |
from langchain.document_loaders import CSVLoader, PDFMinerLoader, TextLoader | |
# from constants import CHROMA_SETTINGS, SOURCE_DIRECTORY, PERSIST_DIRECTORY | |
from langchain.embeddings import HuggingFaceInstructEmbeddings | |
from langchain.text_splitter import ( | |
CharacterTextSplitter, | |
RecursiveCharacterTextSplitter, | |
) | |
from langchain.vectorstores import FAISS # FAISS instead of PineCone | |
from langchain.vectorstores import Chroma | |
from loguru import logger | |
from PyPDF2 import PdfReader # localgpt | |
from chromadb.config import Settings | |
# from utils import xlxs_to_csv | |
# load possible env such as OPENAI_API_KEY | |
# from dotenv import load_dotenv | |
# load_dotenv()load_dotenv() | |
# fix timezone | |
os.environ["TZ"] = "Asia/Shanghai" | |
try: | |
time.tzset() # type: ignore # pylint: disable=no-member | |
except Exception: | |
# Windows | |
logger.warning("Windows, cant run time.tzset()") | |
ROOT_DIRECTORY = Path(__file__).parent | |
PERSIST_DIRECTORY = f"{ROOT_DIRECTORY}/db" | |
# Define the Chroma settings | |
CHROMA_SETTINGS = Settings( | |
chroma_db_impl='duckdb+parquet', | |
persist_directory=PERSIST_DIRECTORY, | |
anonymized_telemetry=False | |
) | |
def load_single_document(file_path: str|Path) -> Document: | |
"""ingest.py""" | |
# Loads a single document from a file path | |
# encoding = detect(open(file_path, "rb").read()).get("encoding", "utf-8") | |
encoding = detect(Path(file_path).read_bytes()).get("encoding", "utf-8") | |
if file_path.endswith(".txt"): | |
if encoding is None: | |
logger.warning( | |
f" {file_path}'s encoding is None " | |
"Something is fishy, return empty str " | |
) | |
return Document(page_content='', metadata={'source': file_path}) | |
try: | |
loader = TextLoader(file_path, encoding=encoding) | |
except Exception as exc: | |
logger.warning(f" {exc}, return dummy ") | |
return Document(page_content='', metadata={'source': file_path}) | |
elif file_path.endswith(".pdf"): | |
loader = PDFMinerLoader(file_path) | |
elif file_path.endswith(".csv"): | |
loader = CSVLoader(file_path) | |
# elif file_path.endswith(".epub"): # for epub? epub2txt unstructured | |
else: | |
if encoding is None: | |
logger.warning( | |
f" {file_path}'s encoding is None " | |
"Likely binary files, return empty str " | |
) | |
return "" | |
try: | |
loader = TextLoader(file_path) | |
except Exception as exc: | |
logger.error(f" {exc}, returnning empty string") | |
return Document(page_content='', metadata={'source': file_path}) | |
return loader.load()[0] | |
def get_pdf_text(pdf_docs): | |
"""docs-chat.""" | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
def get_text_chunks(text): | |
"""docs-chat.""" | |
text_splitter = CharacterTextSplitter( | |
separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len | |
) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
def get_vectorstore(text_chunks): | |
"""docs-chat.""" | |
# embeddings = OpenAIEmbeddings() | |
model_name = "hkunlp/instructor-xl" | |
model_name = "hkunlp/instructor-large" | |
model_name = "hkunlp/instructor-base" | |
logger.info(f"Loading {model_name}") | |
embeddings = HuggingFaceInstructEmbeddings(model_name=model_name) | |
logger.info(f"Done loading {model_name}") | |
logger.info( | |
"Doing vectorstore FAISS.from_texts(texts=text_chunks, embedding=embeddings)" | |
) | |
vectorstore = FAISS.from_texts(texts=text_chunks, embedding=embeddings) | |
logger.info( | |
"Done vectorstore FAISS.from_texts(texts=text_chunks, embedding=embeddings)" | |
) | |
return vectorstore | |
def greet(name): | |
"""Test.""" | |
logger.debug(f" name: [{name}] ") | |
return "Hello " + name + "!!" | |
def upload_files(files): | |
"""Upload files.""" | |
file_paths = [file.name for file in files] | |
logger.info(file_paths) | |
res = ingest(file_paths) | |
# return [str(elm) for elm in res] | |
return file_paths | |
# return ingest(file_paths) | |
def ingest(file_paths: list[str | Path], model_name="hkunlp/instructor-base", device_type="cpu"): | |
"""Gen Chroma db. | |
file_paths = ['C:\\Users\\User\\AppData\\Local\\Temp\\gradio\\41b53dd5f203b423f2dced44eaf56e72508b7bbe\\app.py', 'C:\\Users\\User\\AppData\\Local\\Temp\\gradio\\9390755bb391abc530e71a3946a7b50d463ba0ef\\README.md', 'C:\\Users\\User\\AppData\\Local\\Temp\\gradio\\3341f9a410a60ffa57bf4342f3018a3de689f729\\requirements.txt'] | |
""" | |
if device_type in ['cpu', 'CPU']: | |
device='cpu' | |
elif device_type in ['mps', 'MPS']: | |
device='mps' | |
else: | |
device='cuda' | |
# Load documents and split in chunks | |
# logger.info(f"Loading documents from {SOURCE_DIRECTORY}") | |
# documents = load_documents(SOURCE_DIRECTORY) | |
documents = [] | |
for file_path in file_paths: | |
documents.append(load_single_document(f"{file_path}")) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
texts = text_splitter.split_documents(documents) | |
logger.info(f"Loaded {len(documents)} documents ") | |
logger.info(f"Split into {len(texts)} chunks of text") | |
# Create embeddings | |
embeddings = HuggingFaceInstructEmbeddings( | |
model_name=model_name, | |
model_kwargs={"device": device} | |
) | |
db = Chroma.from_documents( | |
texts, embeddings, | |
persist_directory=PERSIST_DIRECTORY, | |
client_settings=CHROMA_SETTINGS | |
) | |
db.persist() | |
db = None | |
logger.info("Done ingest") | |
return [[Path(doc.metadata.get("source")).name, len(doc.page_content)] for doc in documents] | |
def main1(): | |
"""Lump codes""" | |
with gr.Blocks() as demo: | |
iface = gr.Interface(fn=greet, inputs="text", outputs="text") | |
iface.launch() | |
demo.launch() | |
def main(): | |
"""Do blocks.""" | |
with gr.Blocks() as demo: | |
name = gr.Textbox(label="Name") | |
greet_btn = gr.Button("Submit") | |
output = gr.Textbox(label="Output Box") | |
greet_btn.click(fn=greet, inputs=name, outputs=output, api_name="greet") | |
file_output = gr.File() | |
upload_button = gr.UploadButton( | |
"Click to upload files", | |
# file_types=["*.pdf", "*.epub", "*.docx"], | |
file_count="multiple" | |
) | |
upload_button.upload(upload_files, upload_button, file_output) | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |