gourisankar85's picture
Upload app.py
2856a0a verified
import PyPDF2
import faiss
import numpy as np
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
# Load the LLM for generation
generation_model_name = 'facebook/bart-large-cnn'
generation_model = AutoModelForSeq2SeqLM.from_pretrained(generation_model_name)
tokenizer = AutoTokenizer.from_pretrained(generation_model_name)
#Specify file paths
file_path1 = './AST-1.pdf'
file_path2 = './AST-2.pdf'
#Step 1 : Load the document files
def read_pdf(file_path):
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ''
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text += page.extract_text()
return text
ast1_text = read_pdf(file_path1)
ast2_text = read_pdf(file_path2)
#Step 2 Split the loaded documents into chunks
# Split by Fixed Number of Words:
def chunk_text(text, chunk_size=200):
words = text.split()
chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
return chunks
ast1_chunks = chunk_text(ast1_text, chunk_size=100)
ast2_chunks = chunk_text(ast2_text, chunk_size=150)
#label the chunks
ast1_chunks = [(chunk, 'AST-1') for chunk in ast1_chunks]
ast2_chunks = [(chunk, 'AST-2') for chunk in ast2_chunks]
all_chunks = ast1_chunks + ast2_chunks
print('Created the chunks')
#Load the Embedding Model and LLM
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
# Load the pre-trained model from the MTEB leaderboard
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embeddings = embedding_model.encode(all_chunks, convert_to_tensor=True)
# Convert embeddings to numpy arrays
embeddings_np = np.array([embedding.cpu().numpy() for embedding in embeddings])
# Create a FAISS index
dimension = embeddings_np.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings_np)
# Save the index
faiss.write_index(index, 'embeddings_index.faiss')
# Load FAISS index
stored_index = faiss.read_index('./embeddings_index.faiss')
print('Stored embedding in db')
#Function to retrieve chunks
def retrieve_chunks(query, top_k=10, use_mmr=False, diversity=0.5, target_doc='AST-1'):
query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().numpy()
distances, indices = stored_index.search(np.array([query_embedding]), top_k)
if use_mmr:
# Implement MMR-based retrieval
from sklearn.metrics.pairwise import cosine_similarity
selected_indices = []
selected_distances = []
candidate_indices = [i for i in indices[0] if all_chunks[i][1] == target_doc]
candidate_distances = [distances[0][i] for i in range(len(indices[0])) if all_chunks[indices[0][i]][1] == target_doc]
while len(selected_indices) < top_k and candidate_indices:
if not selected_indices:
selected_indices.append(candidate_indices.pop(0))
selected_distances.append(candidate_distances.pop(0))
else:
remaining_candidates = [candidate_indices[i] for i in range(len(candidate_indices))]
remaining_embeddings = embeddings_np[remaining_candidates]
selected_embeddings = embeddings_np[selected_indices]
similarities = cosine_similarity(remaining_embeddings, selected_embeddings)
mmr_scores = (1 - diversity) * np.array(candidate_distances[:len(remaining_candidates)]) - diversity * np.max(similarities, axis=1)
next_index = np.argmax(mmr_scores)
selected_indices.append(candidate_indices.pop(next_index))
selected_distances.append(candidate_distances.pop(next_index))
return [all_chunks[i][0] for i in selected_indices]
else:
retrieved_chunks = []
for idx in indices[0]:
chunk, doc_label = all_chunks[idx]
if doc_label == target_doc:
retrieved_chunks.append(chunk)
if len(retrieved_chunks) >= top_k:
break
return retrieved_chunks
# Generate response
def generate_response(query, retrieved_chunks):
#context = " ".join([chunk for chunk, _ in retrieved_chunks])
context = " ".join(retrieved_chunks) # for retrieved_chunks as array of strings
input_text = f"Query: {query}\nContext: {context}"
inputs = tokenizer(input_text, return_tensors='pt', max_length=1024, truncation=True)
summary_ids = generation_model.generate(inputs['input_ids'], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True)
return tokenizer.decode(summary_ids[0], skip_special_tokens=True)
def rag_system(query, use_mmr=False):
retrieved_chunks = retrieve_chunks(query, top_k=3, use_mmr=use_mmr)
response = generate_response(query, retrieved_chunks)
print(response)
return response
import gradio as gr
def query_rag_system(query, use_mmr):
return rag_system(query, use_mmr=use_mmr)
interface = gr.Interface(
fn=query_rag_system,
inputs=[
gr.Textbox(lines=2, placeholder="Enter your query here..."),
gr.Checkbox(label="Use MMR")
],
outputs="text",
title="RAG System",
description="Enter a query to get a response from the RAG system. Optionally, use MMR for better results."
)
interface.launch()