Spaces:
Runtime error
Runtime error
import os | |
from typing import List | |
from chainlit.types import AskFileResponse | |
from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader | |
from aimakerspace.openai_utils.prompts import ( | |
UserRolePrompt, | |
SystemRolePrompt, | |
AssistantRolePrompt, | |
) | |
from aimakerspace.openai_utils.embedding import EmbeddingModel | |
from aimakerspace.vectordatabase import VectorDatabase | |
from aimakerspace.openai_utils.chatmodel import ChatOpenAI | |
import chainlit as cl | |
import tempfile | |
import pandas as pd | |
import pdfplumber | |
system_template = """\ | |
Use the following context to answer the user's question. If you cannot find the answer in the context, | |
say you don't know the answer. Additionally, if the user requests a summary or context overview, | |
generate an engaging and concise summary that captures the main ideas with an interesting and appealing tone. | |
""" | |
system_role_prompt = SystemRolePrompt(system_template) | |
user_prompt_template = """\ | |
Context: | |
{context} | |
Question: | |
{question} | |
""" | |
user_role_prompt = UserRolePrompt(user_prompt_template) | |
class RetrievalAugmentedQAPipeline: | |
def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None: | |
self.llm = llm | |
self.vector_db_retriever = vector_db_retriever | |
async def arun_pipeline(self, user_query: str): | |
context_list = self.vector_db_retriever.search_by_text(user_query, k=4) | |
context_prompt = "" | |
for context in context_list: | |
context_prompt += context[0] + "\n" | |
formatted_system_prompt = system_role_prompt.create_message() | |
formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt) | |
async def generate_response(): | |
async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]): | |
yield chunk | |
return {"response": generate_response(), "context": context_list} | |
text_splitter = CharacterTextSplitter() | |
def process_text_file(file: AskFileResponse): | |
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".txt") as temp_file: | |
temp_file_path = temp_file.name | |
temp_file.write(file.read) | |
text_loader = TextFileLoader(temp_file_path) | |
documents = text_loader.load_documents() | |
texts = text_splitter.split_texts(documents) | |
return texts | |
def process_pdf_file(file: AskFileResponse): | |
# Use the path attribute to read the file directly | |
temp_file_path = file.path # Get the path of the uploaded file | |
extracted_text = "" | |
with pdfplumber.open(temp_file_path) as pdf: | |
for page in pdf.pages: | |
extracted_text += page.extract_text() | |
texts = text_splitter.split_texts([extracted_text]) | |
return texts | |
def process_csv_file(file: AskFileResponse): | |
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".csv") as temp_file: | |
temp_file_path = temp_file.name | |
temp_file.write(file.content) | |
df = pd.read_csv(temp_file_path) | |
texts = df.apply(lambda row: ' '.join(row.astype(str)), axis=1).tolist() | |
return text_splitter.split_texts(texts) | |
async def on_chat_start(): | |
cl.user_session.set("all_texts", []) | |
files = await cl.AskFileMessage( | |
content="Please upload one or more Text, PDF, or CSV files to begin!", | |
accept=["text/plain", "application/pdf", "text/csv"], | |
max_size_mb=20, | |
timeout=180, | |
).send() | |
if not files: | |
await cl.Message(content="No files were uploaded. Please upload at least one file to proceed.").send() | |
return | |
all_texts = cl.user_session.get("all_texts", []) | |
for file in files: | |
file_type = file.name.split(".")[-1].lower() | |
msg = cl.Message(content=f"Processing `{file.name}`...") | |
await msg.send() | |
# Process each file based on its type | |
if file_type == "txt": | |
texts = process_text_file(file) | |
elif file_type == "pdf": | |
texts = process_pdf_file(file) | |
elif file_type == "csv": | |
texts = process_csv_file(file) | |
else: | |
await cl.Message(content=f"Unsupported file type: `{file.name}`. Please upload text, PDF, or CSV files.").send() | |
continue | |
all_texts.extend(texts) # Combine texts from all uploaded files | |
cl.user_session.set("all_texts", all_texts) | |
await cl.Message(content="Files processed! You can now start asking questions.").send() | |
async def main(message): | |
chain = cl.user_session.get("chain") | |
if not chain: | |
all_texts = cl.user_session.get("all_texts") | |
if not all_texts: | |
await cl.Message(content="Please upload at least one file before asking questions.").send() | |
return | |
# Create a dict vector store | |
vector_db = VectorDatabase() | |
vector_db = await vector_db.abuild_from_list(all_texts) | |
chat_openai = ChatOpenAI() | |
# Create a chain | |
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline( | |
vector_db_retriever=vector_db, | |
llm=chat_openai | |
) | |
cl.user_session.set("chain", retrieval_augmented_qa_pipeline) | |
chain = retrieval_augmented_qa_pipeline | |
msg = cl.Message(content="") | |
result = await chain.arun_pipeline(message.content) | |
async for stream_resp in result["response"]: | |
await msg.stream_token(stream_resp) | |
await msg.send() | |