Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
from langchain import HuggingFacePipeline, PromptTemplate | |
from langchain.chains import RetrievalQA | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import Chroma | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import os | |
import re | |
import pickle | |
import fitz # PyMuPDF | |
from langchain.schema import Document | |
import langdetect | |
def clean_output(output: str) -> str: | |
print("Raw output:", output) # Debugging line | |
start_index = output.find('[/INST]') + len('[/INST]') | |
cleaned_output = output[start_index:].strip() | |
print("Cleaned output:", cleaned_output) # Debugging line | |
return cleaned_output | |
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" | |
def split_text_into_paragraphs(text_content): | |
paragraphs = text_content.split('#') | |
return [paragraph.strip() for paragraph in paragraphs if paragraph.strip()] | |
def sanitize_filename(filename): | |
sanitized_name = re.sub(r'[^a-zA-Z0-9_-]', '_', filename) | |
return sanitized_name[:63] | |
def extract_text_from_pdf(pdf_path): | |
text_content = '' | |
with fitz.open(pdf_path) as pdf_document: | |
for page_num in range(len(pdf_document)): | |
page = pdf_document[page_num] | |
text_content += page.get_text() | |
return text_content | |
def detect_language(text): | |
try: | |
return langdetect.detect(text) | |
except: | |
return "en" # Default to English if detection fails | |
def process_pdf_file(filename, pdf_path, embeddings, llm, prompt): | |
print(f'\nProcessing: {pdf_path}') | |
text_content = extract_text_from_pdf(pdf_path) | |
language = detect_language(text_content) | |
print(f"Detected language: {language}") | |
paragraphs = split_text_into_paragraphs(text_content) | |
documents = [Document(page_content=paragraph, metadata={"language": language, "source": filename}) for paragraph in paragraphs] | |
print(f"Number of documents created: {len(documents)}") | |
collection_name = sanitize_filename(os.path.basename(filename)) | |
db = Chroma.from_documents(documents, embeddings, collection_name=collection_name) | |
retriever = db.as_retriever(search_kwargs={"k": 2}) | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=llm, | |
chain_type="stuff", | |
retriever=retriever, | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": prompt}, | |
) | |
print(f"QA chain created for {filename}") | |
return qa_chain, language | |
SYSTEM_PROMPT = """ | |
Use the provided context to answer the question clearly and concisely. Do not repeat the context in your answer. | |
""" | |
def generate_prompt(prompt: str, system_prompt: str = SYSTEM_PROMPT) -> str: | |
return f""" | |
[INST] <> | |
{system_prompt} | |
<> | |
{prompt} [/INST] | |
""".strip() | |
def main(): | |
# Streamlit UI | |
st.title("PDF-Powered Chatbot") | |
# File Uploader | |
uploaded_files = st.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
# Model Loading | |
model_pickle_path = '/kaggle/working/model.pkl' | |
if os.path.exists(model_pickle_path): | |
with open(model_pickle_path, 'rb') as f: | |
model, tokenizer = pickle.load(f) | |
else: | |
MODEL_NAME = "sarvamai/sarvam-2b-v0.5" | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True) | |
tokenizer.pad_token = tokenizer.eos_token | |
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE) | |
with open(model_pickle_path, 'wb') as f: | |
pickle.dump((model, tokenizer), f) | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
text_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=1024, | |
temperature=0.1, | |
top_p=0.95, | |
repetition_penalty=1.15, | |
device=DEVICE | |
) | |
llm = HuggingFacePipeline(pipeline=text_pipeline, model_kwargs={"temperature": 0}) | |
template = generate_prompt( | |
""" | |
{context} | |
Question: {question} | |
""", | |
system_prompt=SYSTEM_PROMPT, | |
) | |
prompt = PromptTemplate(template=template, input_variables=["context", "question"]) | |
# Initialize QA chains dictionary | |
qa_chains = {} | |
# Process uploaded files | |
if uploaded_files: | |
with st.spinner("Processing PDFs..."): | |
for uploaded_file in uploaded_files: | |
file_path = uploaded_file.name # Use the filename directly | |
qa_chain, doc_language = process_pdf_file(uploaded_file.name, file_path, embeddings, llm, prompt) | |
qa_chains[doc_language] = (qa_chain, uploaded_file.name) | |
st.success("PDFs processed! You can now ask questions.") | |
# Chat interface | |
if st.button("Clear Chat History"): | |
st.session_state.chat_history = [] | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
for message in st.session_state.chat_history: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
if prompt := st.chat_input("Ask your question here"): | |
st.session_state.chat_history.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
with st.spinner("Generating response..."): | |
query_language = detect_language(prompt) | |
if query_language in qa_chains: | |
qa_chain, _ = qa_chains[query_language] | |
result = qa_chain({"query": prompt}) | |
cleaned_answer = clean_output(result['result']) | |
with st.chat_message("assistant"): | |
st.markdown(cleaned_answer) | |
st.session_state.chat_history.append({"role": "assistant", "content": cleaned_answer}) | |
else: | |
with st.chat_message("assistant"): | |
st.markdown(f"No document available for the detected language: {query_language}") | |
st.session_state.chat_history.append({"role": "assistant", "content": f"No document available for the detected language: {query_language}"}) | |
if __name__ == "__main__": | |
main() |