Spaces:
Sleeping
Sleeping
import PyPDF2 | |
import faiss | |
import numpy as np | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
# Load the LLM for generation | |
generation_model_name = 'facebook/bart-large-cnn' | |
generation_model = AutoModelForSeq2SeqLM.from_pretrained(generation_model_name) | |
tokenizer = AutoTokenizer.from_pretrained(generation_model_name) | |
#Specify file paths | |
file_path1 = './AST-1.pdf' | |
file_path2 = './AST-2.pdf' | |
#Step 1 : Load the document files | |
def read_pdf(file_path): | |
with open(file_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
text = '' | |
for page_num in range(len(reader.pages)): | |
page = reader.pages[page_num] | |
text += page.extract_text() | |
return text | |
ast1_text = read_pdf(file_path1) | |
ast2_text = read_pdf(file_path2) | |
#Step 2 Split the loaded documents into chunks | |
# Split by Fixed Number of Words: | |
def chunk_text(text, chunk_size=200): | |
words = text.split() | |
chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] | |
return chunks | |
ast1_chunks = chunk_text(ast1_text, chunk_size=100) | |
ast2_chunks = chunk_text(ast2_text, chunk_size=150) | |
#label the chunks | |
ast1_chunks = [(chunk, 'AST-1') for chunk in ast1_chunks] | |
ast2_chunks = [(chunk, 'AST-2') for chunk in ast2_chunks] | |
all_chunks = ast1_chunks + ast2_chunks | |
print('Created the chunks') | |
#Load the Embedding Model and LLM | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
# Load the pre-trained model from the MTEB leaderboard | |
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
embeddings = embedding_model.encode(all_chunks, convert_to_tensor=True) | |
# Convert embeddings to numpy arrays | |
embeddings_np = np.array([embedding.cpu().numpy() for embedding in embeddings]) | |
# Create a FAISS index | |
dimension = embeddings_np.shape[1] | |
index = faiss.IndexFlatL2(dimension) | |
index.add(embeddings_np) | |
# Save the index | |
faiss.write_index(index, 'embeddings_index.faiss') | |
# Load FAISS index | |
stored_index = faiss.read_index('./embeddings_index.faiss') | |
print('Stored embedding in db') | |
#Function to retrieve chunks | |
def retrieve_chunks(query, top_k=10, use_mmr=False, diversity=0.5, target_doc='AST-1'): | |
query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().numpy() | |
distances, indices = stored_index.search(np.array([query_embedding]), top_k) | |
if use_mmr: | |
# Implement MMR-based retrieval | |
from sklearn.metrics.pairwise import cosine_similarity | |
selected_indices = [] | |
selected_distances = [] | |
candidate_indices = [i for i in indices[0] if all_chunks[i][1] == target_doc] | |
candidate_distances = [distances[0][i] for i in range(len(indices[0])) if all_chunks[indices[0][i]][1] == target_doc] | |
while len(selected_indices) < top_k and candidate_indices: | |
if not selected_indices: | |
selected_indices.append(candidate_indices.pop(0)) | |
selected_distances.append(candidate_distances.pop(0)) | |
else: | |
remaining_candidates = [candidate_indices[i] for i in range(len(candidate_indices))] | |
remaining_embeddings = embeddings_np[remaining_candidates] | |
selected_embeddings = embeddings_np[selected_indices] | |
similarities = cosine_similarity(remaining_embeddings, selected_embeddings) | |
mmr_scores = (1 - diversity) * np.array(candidate_distances[:len(remaining_candidates)]) - diversity * np.max(similarities, axis=1) | |
next_index = np.argmax(mmr_scores) | |
selected_indices.append(candidate_indices.pop(next_index)) | |
selected_distances.append(candidate_distances.pop(next_index)) | |
return [all_chunks[i][0] for i in selected_indices] | |
else: | |
retrieved_chunks = [] | |
for idx in indices[0]: | |
chunk, doc_label = all_chunks[idx] | |
if doc_label == target_doc: | |
retrieved_chunks.append(chunk) | |
if len(retrieved_chunks) >= top_k: | |
break | |
return retrieved_chunks | |
# Generate response | |
def generate_response(query, retrieved_chunks): | |
#context = " ".join([chunk for chunk, _ in retrieved_chunks]) | |
context = " ".join(retrieved_chunks) # for retrieved_chunks as array of strings | |
input_text = f"Query: {query}\nContext: {context}" | |
inputs = tokenizer(input_text, return_tensors='pt', max_length=1024, truncation=True) | |
summary_ids = generation_model.generate(inputs['input_ids'], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True) | |
return tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
def rag_system(query, use_mmr=False): | |
retrieved_chunks = retrieve_chunks(query, top_k=3, use_mmr=use_mmr) | |
response = generate_response(query, retrieved_chunks) | |
print(response) | |
return response | |
import gradio as gr | |
def query_rag_system(query, use_mmr): | |
return rag_system(query, use_mmr=use_mmr) | |
interface = gr.Interface( | |
fn=query_rag_system, | |
inputs=[ | |
gr.Textbox(lines=2, placeholder="Enter your query here..."), | |
gr.Checkbox(label="Use MMR") | |
], | |
outputs="text", | |
title="RAG System", | |
description="Enter a query to get a response from the RAG system. Optionally, use MMR for better results." | |
) | |
interface.launch() |