VedaMD-Backend-v2 / scripts /build_vector_store.py
sniro23's picture
Production ready: Clean codebase + Cerebras + Automated pipeline
b4971bd
#!/usr/bin/env python3
"""
Automated Vector Store Builder for VedaMD
==========================================
This script automates the complete vector store creation process:
1. Scans directory for PDF documents
2. Extracts text using best available method (PyMuPDF → PDFPlumber → OCR)
3. Smart chunking with medical section awareness
4. Batch embedding generation
5. FAISS index creation
6. Metadata generation (citations, sources, quality scores)
7. Automatic Hugging Face Hub upload
8. Configuration file generation
Usage:
python scripts/build_vector_store.py \\
--input-dir ./Obs \\
--output-dir ./data/vector_store \\
--repo-id sniro23/VedaMD-Vector-Store \\
--upload
Author: VedaMD Team
Date: October 22, 2025
Version: 1.0.0
"""
import os
import sys
import json
import hashlib
import logging
import argparse
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from datetime import datetime
import warnings
# PDF processing
try:
import fitz # PyMuPDF
HAS_PYMUPDF = True
except ImportError:
HAS_PYMUPDF = False
warnings.warn("PyMuPDF not available. Install with: pip install PyMuPDF")
try:
import pdfplumber
HAS_PDFPLUMBER = True
except ImportError:
HAS_PDFPLUMBER = False
warnings.warn("pdfplumber not available. Install with: pip install pdfplumber")
# Embeddings and vector store
try:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
HAS_EMBEDDINGS = True
except ImportError:
HAS_EMBEDDINGS = False
raise ImportError("Required packages not installed. Run: pip install sentence-transformers faiss-cpu numpy")
# Hugging Face Hub
try:
from huggingface_hub import HfApi, create_repo
HAS_HF = True
except ImportError:
HAS_HF = False
warnings.warn("Hugging Face Hub not available. Install with: pip install huggingface-hub")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('vector_store_build.log')
]
)
logger = logging.getLogger(__name__)
class PDFExtractor:
"""Handles PDF text extraction with multiple fallback methods"""
@staticmethod
def extract_with_pymupdf(pdf_path: str) -> Tuple[str, Dict]:
"""Extract text using PyMuPDF (fastest, most reliable)"""
if not HAS_PYMUPDF:
raise ImportError("PyMuPDF not available")
logger.info(f"📄 Extracting with PyMuPDF: {pdf_path}")
text = ""
metadata = {"method": "pymupdf", "pages": 0}
try:
doc = fitz.open(pdf_path)
metadata["pages"] = len(doc)
metadata["title"] = doc.metadata.get("title", "")
metadata["author"] = doc.metadata.get("author", "")
for page_num, page in enumerate(doc, 1):
page_text = page.get_text()
text += f"\n--- Page {page_num} ---\n{page_text}"
doc.close()
logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages")
return text, metadata
except Exception as e:
logger.error(f"❌ PyMuPDF extraction failed: {e}")
raise
@staticmethod
def extract_with_pdfplumber(pdf_path: str) -> Tuple[str, Dict]:
"""Extract text using pdfplumber (better table handling)"""
if not HAS_PDFPLUMBER:
raise ImportError("pdfplumber not available")
logger.info(f"📄 Extracting with pdfplumber: {pdf_path}")
text = ""
metadata = {"method": "pdfplumber", "pages": 0}
try:
with pdfplumber.open(pdf_path) as pdf:
metadata["pages"] = len(pdf.pages)
for page_num, page in enumerate(pdf.pages, 1):
page_text = page.extract_text() or ""
text += f"\n--- Page {page_num} ---\n{page_text}"
logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages")
return text, metadata
except Exception as e:
logger.error(f"❌ pdfplumber extraction failed: {e}")
raise
@staticmethod
def extract_text(pdf_path: str) -> Tuple[str, Dict]:
"""Extract text using best available method with fallbacks"""
errors = []
# Try PyMuPDF first (fastest)
if HAS_PYMUPDF:
try:
return PDFExtractor.extract_with_pymupdf(pdf_path)
except Exception as e:
errors.append(f"PyMuPDF: {e}")
logger.warning(f"⚠️ PyMuPDF failed, trying pdfplumber...")
# Fallback to pdfplumber
if HAS_PDFPLUMBER:
try:
return PDFExtractor.extract_with_pdfplumber(pdf_path)
except Exception as e:
errors.append(f"pdfplumber: {e}")
logger.warning(f"⚠️ pdfplumber failed")
# If all methods fail
raise Exception(f"All extraction methods failed: {'; '.join(errors)}")
class MedicalChunker:
"""Smart chunking with medical section awareness"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 100):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Medical section headers to preserve
self.section_markers = [
"INTRODUCTION", "BACKGROUND", "DEFINITION", "EPIDEMIOLOGY",
"PATHOPHYSIOLOGY", "CLINICAL FEATURES", "DIAGNOSIS", "MANAGEMENT",
"TREATMENT", "PREVENTION", "COMPLICATIONS", "PROGNOSIS",
"REFERENCES", "GUIDELINES", "PROTOCOL", "RECOMMENDATIONS"
]
def chunk_text(self, text: str, source: str) -> List[Dict]:
"""Split text into chunks while preserving medical sections"""
logger.info(f"📝 Chunking text from {source}")
# Clean text
text = text.strip()
if not text:
logger.warning(f"⚠️ Empty text from {source}")
return []
chunks = []
current_chunk = ""
current_section = "General"
# Split by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
para = para.strip()
if not para:
continue
# Check if paragraph is a section header
para_upper = para.upper()
for marker in self.section_markers:
if marker in para_upper and len(para) < 100:
current_section = para
break
# Add paragraph to current chunk
if len(current_chunk) + len(para) + 2 <= self.chunk_size:
current_chunk += f"\n\n{para}"
else:
# Save current chunk
if current_chunk.strip():
chunks.append({
"content": current_chunk.strip(),
"source": source,
"section": current_section,
"size": len(current_chunk)
})
# Start new chunk with overlap
if self.chunk_overlap > 0:
# Keep last few sentences for context
sentences = current_chunk.split('. ')
overlap_text = '. '.join(sentences[-2:]) if len(sentences) > 1 else ""
current_chunk = f"{overlap_text}\n\n{para}"
else:
current_chunk = para
# Add final chunk
if current_chunk.strip():
chunks.append({
"content": current_chunk.strip(),
"source": source,
"section": current_section,
"size": len(current_chunk)
})
logger.info(f"✅ Created {len(chunks)} chunks from {source}")
return chunks
class VectorStoreBuilder:
"""Main vector store builder class"""
def __init__(
self,
input_dir: str,
output_dir: str,
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
chunk_size: int = 1000,
chunk_overlap: int = 100
):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.embedding_model_name = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize components
logger.info(f"🔧 Initializing vector store builder...")
logger.info(f"📁 Input directory: {self.input_dir}")
logger.info(f"📁 Output directory: {self.output_dir}")
# Load embedding model
logger.info(f"🤖 Loading embedding model: {self.embedding_model_name}")
self.embedding_model = SentenceTransformer(self.embedding_model_name)
self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
logger.info(f"✅ Embedding dimension: {self.embedding_dim}")
# Initialize chunker
self.chunker = MedicalChunker(chunk_size, chunk_overlap)
# Storage
self.documents = []
self.embeddings = []
self.metadata = []
def scan_pdfs(self) -> List[Path]:
"""Scan input directory for PDF files"""
logger.info(f"🔍 Scanning for PDFs in {self.input_dir}")
if not self.input_dir.exists():
raise FileNotFoundError(f"Input directory not found: {self.input_dir}")
pdf_files = list(self.input_dir.glob("**/*.pdf"))
logger.info(f"✅ Found {len(pdf_files)} PDF files")
for pdf in pdf_files:
logger.info(f" 📄 {pdf.name}")
return pdf_files
def process_pdf(self, pdf_path: Path) -> int:
"""Process a single PDF file"""
logger.info(f"\n{'='*60}")
logger.info(f"📄 Processing: {pdf_path.name}")
logger.info(f"{'='*60}")
try:
# Extract text
text, extraction_metadata = PDFExtractor.extract_text(str(pdf_path))
if not text or len(text) < 100:
logger.warning(f"⚠️ Extracted text too short ({len(text)} chars), skipping")
return 0
# Generate file hash for duplicate detection
file_hash = hashlib.md5(text.encode()).hexdigest()
# Chunk text
chunks = self.chunker.chunk_text(text, pdf_path.name)
if not chunks:
logger.warning(f"⚠️ No chunks created from {pdf_path.name}")
return 0
# Generate embeddings
logger.info(f"🧮 Generating embeddings for {len(chunks)} chunks...")
chunk_texts = [chunk["content"] for chunk in chunks]
chunk_embeddings = self.embedding_model.encode(
chunk_texts,
show_progress_bar=True,
batch_size=32
)
# Store documents and embeddings
for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)):
self.documents.append(chunk["content"])
self.embeddings.append(embedding)
self.metadata.append({
"source": pdf_path.name,
"section": chunk["section"],
"chunk_id": i,
"chunk_size": chunk["size"],
"file_hash": file_hash,
"extraction_method": extraction_metadata["method"],
"total_pages": extraction_metadata["pages"],
"processed_at": datetime.now().isoformat()
})
logger.info(f"✅ Processed {pdf_path.name}: {len(chunks)} chunks added")
return len(chunks)
except Exception as e:
logger.error(f"❌ Error processing {pdf_path.name}: {e}")
return 0
def build_faiss_index(self):
"""Build FAISS index from embeddings"""
logger.info(f"\n{'='*60}")
logger.info(f"🏗️ Building FAISS index...")
logger.info(f"{'='*60}")
if not self.embeddings:
raise ValueError("No embeddings to index")
# Convert to numpy array
embeddings_array = np.array(self.embeddings).astype('float32')
logger.info(f"📊 Embeddings shape: {embeddings_array.shape}")
# Create FAISS index (L2 distance)
index = faiss.IndexFlatL2(self.embedding_dim)
# Add embeddings
index.add(embeddings_array)
logger.info(f"✅ FAISS index created with {index.ntotal} vectors")
return index
def save_vector_store(self, index):
"""Save vector store to disk"""
logger.info(f"\n{'='*60}")
logger.info(f"💾 Saving vector store...")
logger.info(f"{'='*60}")
# Save FAISS index
index_path = self.output_dir / "faiss_index.bin"
faiss.write_index(index, str(index_path))
logger.info(f"✅ Saved FAISS index: {index_path}")
# Save documents
docs_path = self.output_dir / "documents.json"
with open(docs_path, 'w', encoding='utf-8') as f:
json.dump(self.documents, f, ensure_ascii=False, indent=2)
logger.info(f"✅ Saved documents: {docs_path}")
# Save metadata
metadata_path = self.output_dir / "metadata.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(self.metadata, f, ensure_ascii=False, indent=2)
logger.info(f"✅ Saved metadata: {metadata_path}")
# Save configuration
config = {
"embedding_model": self.embedding_model_name,
"embedding_dim": self.embedding_dim,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"total_documents": len(self.documents),
"total_chunks": len(self.documents),
"build_date": datetime.now().isoformat(),
"version": "1.0.0"
}
config_path = self.output_dir / "config.json"
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
logger.info(f"✅ Saved config: {config_path}")
# Save build log
log_data = {
"build_date": datetime.now().isoformat(),
"input_dir": str(self.input_dir),
"output_dir": str(self.output_dir),
"total_pdfs": len(set(m["source"] for m in self.metadata)),
"total_chunks": len(self.documents),
"sources": list(set(m["source"] for m in self.metadata)),
"config": config
}
log_path = self.output_dir / "build_log.json"
with open(log_path, 'w', encoding='utf-8') as f:
json.dump(log_data, f, indent=2)
logger.info(f"✅ Saved build log: {log_path}")
def upload_to_hf(self, repo_id: str, token: Optional[str] = None):
"""Upload vector store to Hugging Face Hub"""
if not HAS_HF:
logger.warning("⚠️ Hugging Face Hub not available, skipping upload")
return
logger.info(f"\n{'='*60}")
logger.info(f"☁️ Uploading to Hugging Face Hub...")
logger.info(f"📦 Repository: {repo_id}")
logger.info(f"{'='*60}")
try:
api = HfApi(token=token)
# Create repo if it doesn't exist
try:
create_repo(repo_id, repo_type="dataset", exist_ok=True, token=token)
logger.info(f"✅ Repository ready: {repo_id}")
except Exception as e:
logger.warning(f"⚠️ Repo creation: {e}")
# Upload all files
files_to_upload = [
"faiss_index.bin",
"documents.json",
"metadata.json",
"config.json",
"build_log.json"
]
for filename in files_to_upload:
file_path = self.output_dir / filename
if file_path.exists():
logger.info(f"📤 Uploading {filename}...")
api.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset",
token=token
)
logger.info(f"✅ Uploaded {filename}")
logger.info(f"🎉 Upload complete! View at: https://huggingface.co/datasets/{repo_id}")
except Exception as e:
logger.error(f"❌ Upload failed: {e}")
raise
def build(self, upload: bool = False, repo_id: Optional[str] = None, hf_token: Optional[str] = None):
"""Main build process"""
start_time = datetime.now()
logger.info(f"\n{'='*60}")
logger.info(f"🚀 STARTING VECTOR STORE BUILD")
logger.info(f"{'='*60}\n")
try:
# Scan for PDFs
pdf_files = self.scan_pdfs()
if not pdf_files:
raise ValueError("No PDF files found in input directory")
# Process each PDF
total_chunks = 0
for pdf_path in pdf_files:
chunks_added = self.process_pdf(pdf_path)
total_chunks += chunks_added
if total_chunks == 0:
raise ValueError("No chunks created from any PDF")
# Build FAISS index
index = self.build_faiss_index()
# Save to disk
self.save_vector_store(index)
# Upload to HF if requested
if upload and repo_id:
self.upload_to_hf(repo_id, hf_token)
# Summary
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"\n{'='*60}")
logger.info(f"✅ BUILD COMPLETE!")
logger.info(f"{'='*60}")
logger.info(f"📊 Summary:")
logger.info(f" • PDFs processed: {len(pdf_files)}")
logger.info(f" • Total chunks: {total_chunks}")
logger.info(f" • Embedding dimension: {self.embedding_dim}")
logger.info(f" • Output directory: {self.output_dir}")
logger.info(f" • Build time: {duration:.2f} seconds")
logger.info(f"{'='*60}\n")
return True
except Exception as e:
logger.error(f"\n{'='*60}")
logger.error(f"❌ BUILD FAILED: {e}")
logger.error(f"{'='*60}\n")
raise
def main():
parser = argparse.ArgumentParser(
description="Build VedaMD Vector Store from PDF documents",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Build locally
python scripts/build_vector_store.py --input-dir ./Obs --output-dir ./data/vector_store
# Build and upload to HF
python scripts/build_vector_store.py \\
--input-dir ./Obs \\
--output-dir ./data/vector_store \\
--repo-id sniro23/VedaMD-Vector-Store \\
--upload
"""
)
parser.add_argument(
"--input-dir",
type=str,
required=True,
help="Directory containing PDF files"
)
parser.add_argument(
"--output-dir",
type=str,
default="./data/vector_store",
help="Output directory for vector store files"
)
parser.add_argument(
"--embedding-model",
type=str,
default="sentence-transformers/all-MiniLM-L6-v2",
help="Sentence transformer model for embeddings"
)
parser.add_argument(
"--chunk-size",
type=int,
default=1000,
help="Maximum chunk size in characters"
)
parser.add_argument(
"--chunk-overlap",
type=int,
default=100,
help="Overlap between chunks in characters"
)
parser.add_argument(
"--upload",
action="store_true",
help="Upload to Hugging Face Hub after building"
)
parser.add_argument(
"--repo-id",
type=str,
help="Hugging Face repository ID (e.g., username/repo-name)"
)
parser.add_argument(
"--hf-token",
type=str,
help="Hugging Face API token (or set HF_TOKEN env var)"
)
args = parser.parse_args()
# Get HF token from env if not provided
hf_token = args.hf_token or os.getenv("HF_TOKEN")
# Validate upload arguments
if args.upload and not args.repo_id:
parser.error("--repo-id is required when --upload is specified")
# Build vector store
builder = VectorStoreBuilder(
input_dir=args.input_dir,
output_dir=args.output_dir,
embedding_model=args.embedding_model,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap
)
builder.build(
upload=args.upload,
repo_id=args.repo_id,
hf_token=hf_token
)
if __name__ == "__main__":
main()