Sam
Reverting to non-streaming version
655e2c9
#-----Import Required Libraries-----#
import os
from dotenv import load_dotenv
import openai
import fitz # PyMuPDF
import pandas as pd
from transformers import pipeline
from qdrant_client import QdrantClient
from qdrant_client.http import models as qdrant_models
import chainlit as cl
import tiktoken
# Specific imports from the libraries
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Qdrant
from langchain.prompts import ChatPromptTemplate
from operator import itemgetter
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
import glob
#-----Set Environment Variables-----#
load_dotenv()
# Load environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Initialize OpenAI client after loading the environment variables
openai.api_key = OPENAI_API_KEY
#-----Document Loading and Processing -----#
# Load all PDF files from the specified directory
pdf_files = glob.glob("/home/user/app/data/*.pdf")
# Initialize an empty list to hold all documents
documents = []
# Load each PDF file and append its documents to the list
for pdf_file in pdf_files:
loader = PyMuPDFLoader(pdf_file)
documents.extend(loader.load())
# Split the documents into chunks
def tiktoken_len(text):
tokens = tiktoken.encoding_for_model("gpt-4o").encode(text)
return len(tokens)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
length_function = tiktoken_len
)
split_chunks = text_splitter.split_documents(documents)
#-----Embedding and Vector Store Setup-----#
# Load OpenAI Embeddings Model
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Check that the embeddings model works as expected
try:
test_text = "Sample text for embedding."
test_embedding = embeddings.embed_query(test_text)
print(f"Test embedding generated successfully: {test_embedding[:5]}...") # Print a part of the embedding
except Exception as e:
print(f"Error generating test embedding: {e}")
exit()
# Creating a Qdrant Vector Store
print(f"Number of split chunks: {len(split_chunks)}")
if len(split_chunks) == 0:
print("Error: No split chunks found. Please check the document loading and splitting process.")
exit()
qdrant_vector_store = Qdrant.from_documents(
split_chunks,
embeddings,
location=":memory:",
collection_name="HUD_FSS_Rules_and_Regs",
)
# Create a Retriever
retriever = qdrant_vector_store.as_retriever()
#-----Prompt Template and Language Model Setup-----#
# Define the prompt template
template = """You are a helpful AI chatbot for HUD Family Self Sufficiency (FSS) Program Managers and FSS Coordinators. You answer questions about HUD FSS rules and regulations and help guide program managers and FSS Coordinators to lead FSS programs that are participant-centered and draw insights from the Compass Working Capital program model.
Draw from your knowledge base wherever possible to answer questions. Your knowledge base includes:
1. Relevant HUD regulations from the Code of Federal Regulations (CFR). This includes CFR Part 887 and CFR Part 984.
2. The FSS Final Rule from 7/13/2023, which also includes Q&A with answers from HUD.
3. The FSS Program Guidebook created by HUD.
You use these resources to help FSS Coordinators with their questions. When communicating with FSS Program Managers and FSS Coordinators, follow these guidelines:
1. Be Client-Centered: Your goal is to help the FSS client be successful and benefit from the FSS program. Write in a way that emphasizes what the client is able to do and how the user can support the client. If the FSS coordinator or FSS program manager can choose to interpret rules and regulations in a way that is advantageous to the FSS client, encourage them to do so. Do not suggest options that are strictly adhering to the rules in a way that is disadvantageous to the FSS client when there are options to interpret the rules in a way that is advantageous to the FSS client.
2. Cite Your Sources: When you reference the Code of Federal Regulations (CFR) documents from the knowledge base, include the Part, Subpart, Section, and other identifying information for what you are referencing so the user can learn more. Those documents will have clear labels for Parts, Subparts, and Sections such as § 984.305 (a) (2) (ii). When you pull information from these documents, include those section labels and a quote of the actual text formatted in a way that makes it clear that it's a quote. For other documents, include quotes if they're very relevant and be sure to include the name of the document it's from. If you don't know the name of the document, do not include the quote.
3. Making the Complex Simple: FSS program manager questions are often quite complex and embedded within a specific client scenario. Provide relevant context from the knowledge base and then adapt it to the specific client scenario. Be clear, concise, but still friendly and supportive in tone.
Generally, a good answer will:
1. Defer first to the content in the HUD regulations and make direct references to them whenever possible. Sometimes questions are worded in a way that suggests that the FSS program has discretion in an area where there is none. Review the regulations first to see what is clearly allowed or not allowed before consulting other sources.
2. Defer second to the program Action Plan. You will not have access to individual programs Action Plans, but the answer should prompt the user to review their policies on whatever topic they asked about. You could also make reference to specific, required Action Plan sections using HUD’s Sample Action Plan. If the question asked is related to an area governed by a local policy decision, encourage the user to consider adopting a flexible, client-centered approach. Remind the user that Action Plan policies can be updated and changed. Revised Action Plans need to be approved by HUD.
3. Defer third to other applicable HUD sources like the Guidebook and the FAQs in the FSS Final Rule. If content in the Guidebook and FAQs differs from the HUD regulations, the regulations should be considered correct.
4. Infuse client-centered responses throughout. If the policy in question includes a local policy decision, encourage the user to take a client-centered approach.
Context:
{context}
Question:
{question}
"""
prompt = ChatPromptTemplate.from_template(template)
# Define the primary LLM
primary_llm = ChatOpenAI(model_name="gpt-4o", temperature=0, streaming=True)
#-----Creating a Retrieval Augmented Generation (RAG) Chain-----#
# The RAG chain:
# (1) Takes the user question and retrieves relevant context,
# (2) Passes the context through unchanged,
# (3) Formats the prompt with context and question, then send it to the LLM to generate a response
retrieval_augmented_qa_chain = (
# INVOKE CHAIN WITH: {"question" : "<>"}
# "question" : populated by getting the value of the "question" key
# "context" : populated by getting the value of the "question" key and chaining it into the base_retriever
{"context": itemgetter("question") | retriever, "question": itemgetter("question")}
# "context" : is assigned to a RunnablePassthrough object (will not be called or considered in the next step)
# by getting the value of the "context" key from the previous step
| RunnablePassthrough.assign(context=itemgetter("context"))
# "response" : the "context" and "question" values are used to format our prompt object and then piped
# into the LLM and stored in a key called "response"
# "context" : populated by getting the value of the "context" key from the previous step
| {"response": prompt | primary_llm, "context": itemgetter("context")}
)
#-----Chainlit Integration-----#
# Sets initial chat settings at the start of a user session
@cl.on_chat_start
async def start_chat():
settings = {
"model": "gpt-4o",
"temperature": 0,
"max_tokens": 750,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
}
cl.user_session.set("settings", settings)
# Processes incoming messages from the user and sends a response through a series of steps:
# (1) Retrieves the user's settings
# (2) Invokes the RAG chain with the user's message
# (3) Extracts the content from the response and sends it back to the user
@cl.on_message
async def handle_message(message: cl.Message):
settings = cl.user_session.get("settings")
response = retrieval_augmented_qa_chain.invoke({"question": message.content})
# Extracting and sending just the content
content = response["response"].content
pretty_content = content.strip() # Remove any leading/trailing whitespace
await cl.Message(content=pretty_content).send()