Vira21's picture
Update app.py
5353dbf verified
import os
from typing import List
from chainlit.types import AskFileResponse
from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader
from aimakerspace.openai_utils.prompts import (
UserRolePrompt,
SystemRolePrompt,
AssistantRolePrompt,
)
from aimakerspace.openai_utils.embedding import EmbeddingModel
from aimakerspace.vectordatabase import VectorDatabase
from aimakerspace.openai_utils.chatmodel import ChatOpenAI
import chainlit as cl
import tempfile
import pandas as pd
import pdfplumber
system_template = """\
Use the following context to answer the user's question. If you cannot find the answer in the context,
say you don't know the answer. Additionally, if the user requests a summary or context overview,
generate an engaging and concise summary that captures the main ideas with an interesting and appealing tone.
"""
system_role_prompt = SystemRolePrompt(system_template)
user_prompt_template = """\
Context:
{context}
Question:
{question}
"""
user_role_prompt = UserRolePrompt(user_prompt_template)
class RetrievalAugmentedQAPipeline:
def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None:
self.llm = llm
self.vector_db_retriever = vector_db_retriever
async def arun_pipeline(self, user_query: str):
context_list = self.vector_db_retriever.search_by_text(user_query, k=4)
context_prompt = ""
for context in context_list:
context_prompt += context[0] + "\n"
formatted_system_prompt = system_role_prompt.create_message()
formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt)
async def generate_response():
async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]):
yield chunk
return {"response": generate_response(), "context": context_list}
text_splitter = CharacterTextSplitter()
def process_text_file(file: AskFileResponse):
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".txt") as temp_file:
temp_file_path = temp_file.name
temp_file.write(file.read)
text_loader = TextFileLoader(temp_file_path)
documents = text_loader.load_documents()
texts = text_splitter.split_texts(documents)
return texts
def process_pdf_file(file: AskFileResponse):
# Use the path attribute to read the file directly
temp_file_path = file.path # Get the path of the uploaded file
extracted_text = ""
with pdfplumber.open(temp_file_path) as pdf:
for page in pdf.pages:
extracted_text += page.extract_text()
texts = text_splitter.split_texts([extracted_text])
return texts
def process_csv_file(file: AskFileResponse):
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".csv") as temp_file:
temp_file_path = temp_file.name
temp_file.write(file.content)
df = pd.read_csv(temp_file_path)
texts = df.apply(lambda row: ' '.join(row.astype(str)), axis=1).tolist()
return text_splitter.split_texts(texts)
@cl.on_chat_start
async def on_chat_start():
cl.user_session.set("all_texts", [])
files = await cl.AskFileMessage(
content="Please upload one or more Text, PDF, or CSV files to begin!",
accept=["text/plain", "application/pdf", "text/csv"],
max_size_mb=20,
timeout=180,
).send()
if not files:
await cl.Message(content="No files were uploaded. Please upload at least one file to proceed.").send()
return
all_texts = cl.user_session.get("all_texts", [])
for file in files:
file_type = file.name.split(".")[-1].lower()
msg = cl.Message(content=f"Processing `{file.name}`...")
await msg.send()
# Process each file based on its type
if file_type == "txt":
texts = process_text_file(file)
elif file_type == "pdf":
texts = process_pdf_file(file)
elif file_type == "csv":
texts = process_csv_file(file)
else:
await cl.Message(content=f"Unsupported file type: `{file.name}`. Please upload text, PDF, or CSV files.").send()
continue
all_texts.extend(texts) # Combine texts from all uploaded files
cl.user_session.set("all_texts", all_texts)
await cl.Message(content="Files processed! You can now start asking questions.").send()
@cl.on_message
async def main(message):
chain = cl.user_session.get("chain")
if not chain:
all_texts = cl.user_session.get("all_texts")
if not all_texts:
await cl.Message(content="Please upload at least one file before asking questions.").send()
return
# Create a dict vector store
vector_db = VectorDatabase()
vector_db = await vector_db.abuild_from_list(all_texts)
chat_openai = ChatOpenAI()
# Create a chain
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(
vector_db_retriever=vector_db,
llm=chat_openai
)
cl.user_session.set("chain", retrieval_augmented_qa_pipeline)
chain = retrieval_augmented_qa_pipeline
msg = cl.Message(content="")
result = await chain.arun_pipeline(message.content)
async for stream_resp in result["response"]:
await msg.stream_token(stream_resp)
await msg.send()