Spaces:
Build error
Build error
| import streamlit as st | |
| import torch | |
| from langchain import HuggingFacePipeline, PromptTemplate | |
| from langchain.chains import RetrievalQA | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| import os | |
| import re | |
| import pickle | |
| import fitz # PyMuPDF | |
| from langchain.schema import Document | |
| import langdetect | |
| def clean_output(output: str) -> str: | |
| print("Raw output:", output) # Debugging line | |
| start_index = output.find('[/INST]') + len('[/INST]') | |
| cleaned_output = output[start_index:].strip() | |
| print("Cleaned output:", cleaned_output) # Debugging line | |
| return cleaned_output | |
| DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| def split_text_into_paragraphs(text_content): | |
| paragraphs = text_content.split('#') | |
| return [paragraph.strip() for paragraph in paragraphs if paragraph.strip()] | |
| def sanitize_filename(filename): | |
| sanitized_name = re.sub(r'[^a-zA-Z0-9_-]', '_', filename) | |
| return sanitized_name[:63] | |
| def extract_text_from_pdf(pdf_path): | |
| text_content = '' | |
| with fitz.open(pdf_path) as pdf_document: | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document[page_num] | |
| text_content += page.get_text() | |
| return text_content | |
| def detect_language(text): | |
| try: | |
| return langdetect.detect(text) | |
| except: | |
| return "en" # Default to English if detection fails | |
| def process_pdf_file(filename, pdf_path, embeddings, llm, prompt): | |
| print(f'\nProcessing: {pdf_path}') | |
| text_content = extract_text_from_pdf(pdf_path) | |
| language = detect_language(text_content) | |
| print(f"Detected language: {language}") | |
| paragraphs = split_text_into_paragraphs(text_content) | |
| documents = [Document(page_content=paragraph, metadata={"language": language, "source": filename}) for paragraph in paragraphs] | |
| print(f"Number of documents created: {len(documents)}") | |
| collection_name = sanitize_filename(os.path.basename(filename)) | |
| db = Chroma.from_documents(documents, embeddings, collection_name=collection_name) | |
| retriever = db.as_retriever(search_kwargs={"k": 2}) | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": prompt}, | |
| ) | |
| print(f"QA chain created for {filename}") | |
| return qa_chain, language | |
| SYSTEM_PROMPT = """ | |
| Use the provided context to answer the question clearly and concisely. Do not repeat the context in your answer. | |
| """ | |
| def generate_prompt(prompt: str, system_prompt: str = SYSTEM_PROMPT) -> str: | |
| return f""" | |
| [INST] <> | |
| {system_prompt} | |
| <> | |
| {prompt} [/INST] | |
| """.strip() | |
| def main(): | |
| # Streamlit UI | |
| st.title("PDF-Powered Chatbot") | |
| # File Uploader | |
| uploaded_files = st.file_uploader("Upload PDF files", type="pdf", accept_multiple_files=True) | |
| # Model Loading | |
| model_pickle_path = '/kaggle/working/model.pkl' | |
| if os.path.exists(model_pickle_path): | |
| with open(model_pickle_path, 'rb') as f: | |
| model, tokenizer = pickle.load(f) | |
| else: | |
| MODEL_NAME = "sarvamai/sarvam-2b-v0.5" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE) | |
| with open(model_pickle_path, 'wb') as f: | |
| pickle.dump((model, tokenizer), f) | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| text_pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=1024, | |
| temperature=0.1, | |
| top_p=0.95, | |
| repetition_penalty=1.15, | |
| device=DEVICE | |
| ) | |
| llm = HuggingFacePipeline(pipeline=text_pipeline, model_kwargs={"temperature": 0}) | |
| template = generate_prompt( | |
| """ | |
| {context} | |
| Question: {question} | |
| """, | |
| system_prompt=SYSTEM_PROMPT, | |
| ) | |
| prompt = PromptTemplate(template=template, input_variables=["context", "question"]) | |
| # Initialize QA chains dictionary | |
| qa_chains = {} | |
| # Process uploaded files | |
| if uploaded_files: | |
| with st.spinner("Processing PDFs..."): | |
| for uploaded_file in uploaded_files: | |
| file_path = uploaded_file.name # Use the filename directly | |
| qa_chain, doc_language = process_pdf_file(uploaded_file.name, file_path, embeddings, llm, prompt) | |
| qa_chains[doc_language] = (qa_chain, uploaded_file.name) | |
| st.success("PDFs processed! You can now ask questions.") | |
| # Chat interface | |
| if st.button("Clear Chat History"): | |
| st.session_state.chat_history = [] | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| if prompt := st.chat_input("Ask your question here"): | |
| st.session_state.chat_history.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| with st.spinner("Generating response..."): | |
| query_language = detect_language(prompt) | |
| if query_language in qa_chains: | |
| qa_chain, _ = qa_chains[query_language] | |
| result = qa_chain({"query": prompt}) | |
| cleaned_answer = clean_output(result['result']) | |
| with st.chat_message("assistant"): | |
| st.markdown(cleaned_answer) | |
| st.session_state.chat_history.append({"role": "assistant", "content": cleaned_answer}) | |
| else: | |
| with st.chat_message("assistant"): | |
| st.markdown(f"No document available for the detected language: {query_language}") | |
| st.session_state.chat_history.append({"role": "assistant", "content": f"No document available for the detected language: {query_language}"}) | |
| if __name__ == "__main__": | |
| main() |