Spaces:
Sleeping
Sleeping
""" | |
BOOK BUDDY β Ask questions about your PDFs | |
This file is written with super-simple names and big comments, | |
so a kid can read it and understand whatβs going on. | |
How it works: | |
1) We read your PDF and cut it into small text pieces. | |
2) We make "numbers" (embeddings) for each piece so we can search fast. | |
3) When you ask a question, we find the best pieces and give them | |
to a friendly robot model (Mistral) to make a short answer. | |
4) We also show which book files we used (sources). | |
""" | |
import os | |
import numpy as np | |
import gradio as gr | |
from typing import List, Tuple | |
from pypdf import PdfReader | |
from sentence_transformers import SentenceTransformer | |
from huggingface_hub import InferenceClient | |
# ====== SETTINGS YOU CAN CHANGE ====== | |
ROBOT_MODEL = os.getenv("GEN_MODEL", "mistralai/Mistral-7B-Instruct-v0.2") | |
HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
PIECE_SIZE = 900 # how big each text piece is | |
PIECE_OVERLAP = 150 # how much pieces overlap | |
TOP_K = 4 # how many pieces we use to answer | |
# ====== TRY FAISS (fast search). IF NOT, USE SIMPLE NUMPY SEARCH ====== | |
USE_FAISS = True | |
try: | |
import faiss # fast similarity search | |
except Exception: | |
USE_FAISS = False | |
# ====== GLOBAL MEMORY (lives while the app is running) ====== | |
make_numbers = SentenceTransformer(EMBEDDING_MODEL) | |
faiss_index = None # used if FAISS works | |
all_vectors = None # used if FAISS doesn't work | |
all_pieces: List[str] = [] | |
all_files: List[str] = [] | |
client = InferenceClient(model=ROBOT_MODEL, token=HF_TOKEN) | |
# A friendly rule for the robot | |
ROBOT_RULES = ( | |
"You are a helpful assistant. Use the given CONTEXT to answer the QUESTION.\n" | |
"!!IMPORTANT!! - If the answer is not in the context, Strictly say 'I don't know.', Do not respond any other answers!!!\n" | |
"Be short and add source filenames at the end like [source: file.pdf]." | |
) | |
# ====== LITTLE HELPER FUNCTIONS ====== | |
def read_pdf_text(path: str) -> str: | |
"""Open a PDF and return all the text inside.""" | |
reader = PdfReader(path) | |
pages = [(p.extract_text() or "") for p in reader.pages] | |
return "\n".join(pages) | |
def cut_into_pieces(big_text: str, size: int, overlap: int) -> List[str]: | |
"""Cut text into small overlapping pieces (like puzzle pieces).""" | |
pieces, step = [], size - overlap | |
i, n = 0, len(big_text) | |
while i < n: | |
chunk = big_text[i:i+size].strip() | |
if chunk: | |
pieces.append(chunk) | |
i += step | |
return pieces | |
def embed_texts(texts: List[str]) -> np.ndarray: | |
"""Turn text into numbers so we can search by meaning.""" | |
X = make_numbers.encode(texts, convert_to_numpy=True, normalize_embeddings=True) | |
return np.asarray(X, dtype=np.float32) | |
def start_memory(dim: int): | |
"""Create the place where we store the vectors (FAISS or NumPy).""" | |
global faiss_index, all_vectors | |
if USE_FAISS: | |
faiss_index = faiss.IndexFlatIP(dim) # inner product = cosine because normalized | |
else: | |
faiss_index = None | |
all_vectors = None | |
def add_to_memory(vectors: np.ndarray): | |
"""Put new vectors into our memory.""" | |
global all_vectors | |
if USE_FAISS: | |
faiss_index.add(vectors) | |
else: | |
all_vectors = vectors if all_vectors is None else np.vstack([all_vectors, vectors]) | |
def search_best_pieces(query_vector: np.ndarray, k: int) -> Tuple[np.ndarray, np.ndarray]: | |
"""Find the k best matching pieces for the question.""" | |
if USE_FAISS: | |
return faiss_index.search(query_vector, k) # returns (distances, indices) | |
scores = all_vectors @ query_vector[0] # cosine/IP because normalized | |
idx = np.argsort(-scores)[:k] | |
return scores[idx][None, :], idx[None, :] | |
# ====== MAIN ACTIONS THE BUTTONS CALL ====== | |
def reset_everything(): | |
"""Clear all memory (like starting fresh).""" | |
global faiss_index, all_vectors, all_pieces, all_files | |
faiss_index = None | |
all_vectors = None | |
all_pieces = [] | |
all_files = [] | |
return "Cleared! Upload PDFs again and click Build Index." | |
def build_memory_from_pdfs(files) -> str: | |
"""Read PDFs β cut into pieces β turn to numbers β store them.""" | |
global all_pieces, all_files | |
all_pieces, all_files = [], [] | |
# 1) read + cut | |
for f in files: | |
text = read_pdf_text(f.name) | |
pieces = cut_into_pieces(text, PIECE_SIZE, PIECE_OVERLAP) | |
all_pieces.extend(pieces) | |
all_files.extend([os.path.basename(f.name)] * len(pieces)) | |
if not all_pieces: | |
return "No text found. Try another PDF." | |
# 2) embeddings + memory | |
E = embed_texts(all_pieces) | |
start_memory(E.shape[1]) | |
add_to_memory(E) | |
return f"Indexed {len(all_pieces)} pieces from {len(files)} file(s)." | |
def ask_robot(question: str) -> str: | |
"""Search the best pieces and ask the robot model to answer.""" | |
if not question.strip(): | |
return "Type a question in the box." | |
if (USE_FAISS and faiss_index is None) or (not USE_FAISS and all_vectors is None) or not all_pieces: | |
return "Upload PDFs and press **Build Index** first." | |
# 1) find helpful pieces | |
qv = embed_texts([question]) | |
_, idxs = search_best_pieces(qv, TOP_K) | |
ids = [i for i in idxs[0].tolist() if i >= 0] | |
# 2) build the context we give to the robot | |
context_blocks = [] | |
used_files = [] | |
for rank, i in enumerate(ids, start=1): | |
snippet = all_pieces[i][:1000] | |
fname = all_files[i] | |
context_blocks.append(f"[{rank}] {fname}\n{snippet}") | |
used_files.append(fname) | |
context_text = "\n\n---\n".join(context_blocks) | |
# 3) talk to the robot on Hugging Face | |
messages = [ | |
{"role": "system", "content": ROBOT_RULES}, | |
{"role": "user", "content": f"QUESTION: {question}\n\nCONTEXT:\n{context_text}"}, | |
] | |
# Plan A: chat-completions (most models) | |
try: | |
resp = client.chat.completions.create( | |
model=ROBOT_MODEL, | |
messages=messages, | |
max_tokens=512, | |
temperature=0.2, | |
top_p=0.95, | |
) | |
out = resp.choices[0].message.content | |
except Exception: | |
# Plan B: plain text generation (some endpoints) | |
prompt = f"[INST] {ROBOT_RULES}\n\nQUESTION: {question}\n\nCONTEXT:\n{context_text}\n[/INST]" | |
out = client.text_generation( | |
prompt, | |
max_new_tokens=512, | |
temperature=0.2, | |
top_p=0.95, | |
repetition_penalty=1.05, | |
do_sample=True, | |
return_full_text=False, | |
) | |
# 4) add sources (the book files we used) | |
unique_sources = ", ".join(sorted(set(used_files))) if used_files else "N/A" | |
return f"{out.strip()}\n\nSources: {unique_sources}" | |
# ====== THE SIMPLE WEB PAGE ====== | |
with gr.Blocks(title="π Book Buddy β Ask your PDFs") as demo: | |
gr.Markdown( | |
"## π Book Buddy\n" | |
"1) Upload your PDF book. 2) Press **Build Index** (Book Buddy learns!). " | |
"3) Ask your question. 4) Look at **Sources** to see which file was used.\n" | |
"_Tip: start with one small PDF so itβs fast._" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
pdfs = gr.File(file_count="multiple", file_types=[".pdf"], label="Upload PDF books") | |
build_btn = gr.Button("π§ Build Index", variant="primary") | |
reset_btn = gr.Button("π Reset") | |
status = gr.Markdown() | |
with gr.Column(scale=2): | |
q = gr.Textbox(label="Ask a question", placeholder="Example: Give me 3 key points from this book.") | |
examples = gr.Examples( | |
examples=[ | |
["Summarize the main idea in 2 sentences."], | |
["List 3 important facts from this book."], | |
], | |
inputs=q, | |
) | |
ask_btn = gr.Button("β‘οΈ Ask") | |
answer = gr.Markdown() | |
build_btn.click(build_memory_from_pdfs, inputs=[pdfs], outputs=[status]) | |
reset_btn.click(fn=reset_everything, inputs=None, outputs=[status]) | |
ask_btn.click(ask_robot, inputs=[q], outputs=[answer]) | |
q.submit(ask_robot, inputs=[q], outputs=[answer]) | |
if __name__ == "__main__": | |
demo.launch() | |