Voiceover-ai-2 / rag.py
Isshi14's picture
Upload 11 files
3828c7d verified
"""
VoiceVerse AI β€” RAG Pipeline.
Handles document ingestion, text chunking, embedding generation,
and semantic retrieval using an in-memory vector store.
Models used:
- sentence-transformers/all-MiniLM-L6-v2 for embeddings (22 MB, CPU-friendly)
Design decisions:
- NumPy cosine similarity instead of FAISS to avoid heavy native deps
- Overlapping chunks to preserve context across boundaries
- Single-document architecture (clear store on new upload)
"""
import os
import numpy as np
from utils import logger
# ── Text Extraction ──────────────────────────────────────────────────────────
def extract_text(file_path: str) -> str:
"""
Extract plain text from a PDF or TXT file.
Returns the full document text as a single string.
"""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
return _extract_pdf(file_path)
elif ext == ".txt":
return _extract_txt(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
def _extract_pdf(file_path: str) -> str:
"""Extract text from PDF using PyMuPDF."""
import fitz # PyMuPDF
text_parts = []
with fitz.open(file_path) as doc:
for page_num, page in enumerate(doc):
page_text = page.get_text("text")
if page_text.strip():
text_parts.append(page_text)
logger.debug("Extracted page %d: %d chars", page_num + 1, len(page_text))
full_text = "\n\n".join(text_parts)
logger.info("PDF extraction complete: %d pages, %d chars total",
len(text_parts), len(full_text))
return full_text
def _extract_txt(file_path: str) -> str:
"""Read plain text file with encoding fallback."""
for encoding in ("utf-8", "utf-8-sig", "latin-1", "cp1252"):
try:
with open(file_path, "r", encoding=encoding) as f:
text = f.read()
logger.info("TXT extraction complete (%s): %d chars", encoding, len(text))
return text
except UnicodeDecodeError:
continue
raise ValueError("Could not decode the text file with any supported encoding.")
# ── Text Chunking ────────────────────────────────────────────────────────────
def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
"""
Split text into overlapping chunks of roughly `chunk_size` characters.
Overlap ensures context isn't lost at chunk boundaries.
Uses sentence-aware splitting: tries to break at sentence boundaries
within the chunk window for more coherent chunks.
"""
if not text or not text.strip():
return []
# Clean up whitespace
text = " ".join(text.split())
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# If not at the end, try to break at a sentence boundary
if end < len(text):
# Look for sentence-ending punctuation near the end
search_start = max(start + chunk_size // 2, start)
last_period = -1
for i in range(min(end, len(text)) - 1, search_start - 1, -1):
if text[i] in ".!?" and (i + 1 >= len(text) or text[i + 1] == " "):
last_period = i
break
if last_period > start:
end = last_period + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move forward by (chunk length - overlap)
start = max(start + 1, end - overlap)
logger.info("Chunking complete: %d chunks (size=%d, overlap=%d)",
len(chunks), chunk_size, overlap)
return chunks
# ── Embedding & Vector Store ─────────────────────────────────────────────────
class RAGStore:
"""
In-memory vector store using sentence-transformers embeddings
and NumPy cosine similarity.
Usage:
store = RAGStore()
store.add_document("full document text here")
results = store.query("what is this about?", top_k=5)
"""
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
def __init__(self):
self._model = None
self.chunks: list[str] = []
self.embeddings: np.ndarray | None = None
@property
def model(self):
"""Lazy-load the embedding model to avoid startup cost."""
if self._model is None:
logger.info("Loading embedding model: %s", self.MODEL_NAME)
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.MODEL_NAME)
logger.info("Embedding model loaded successfully")
return self._model
def clear(self):
"""Clear the store for a new document."""
self.chunks = []
self.embeddings = None
def add_document(self, text: str, chunk_size: int = 512, overlap: int = 50):
"""
Process a document: chunk the text, generate embeddings, and store.
Clears any previously stored document.
"""
self.clear()
self.chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap)
if not self.chunks:
raise ValueError("No text chunks could be extracted from the document.")
logger.info("Generating embeddings for %d chunks...", len(self.chunks))
self.embeddings = self.model.encode(
self.chunks,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=True, # Pre-normalize for faster cosine sim
)
logger.info("Embeddings generated: shape %s", self.embeddings.shape)
def query(self, question: str, top_k: int = 5) -> list[str]:
"""
Retrieve the top-k most relevant chunks for the given question.
Uses cosine similarity (dot product on normalized vectors).
"""
if self.embeddings is None or len(self.chunks) == 0:
return []
# Embed the query
query_embedding = self.model.encode(
[question],
convert_to_numpy=True,
normalize_embeddings=True,
)
# Cosine similarity = dot product (vectors are pre-normalized)
similarities = np.dot(self.embeddings, query_embedding.T).flatten()
# Get top-k indices
top_k = min(top_k, len(self.chunks))
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = [self.chunks[i] for i in top_indices]
logger.info("Retrieved %d chunks (top similarity: %.3f)",
len(results), similarities[top_indices[0]])
return results
def get_all_chunks(self) -> list[str]:
"""Return all stored chunks (useful for short documents)."""
return self.chunks.copy()