Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Automated Vector Store Builder for VedaMD | |
| ========================================== | |
| This script automates the complete vector store creation process: | |
| 1. Scans directory for PDF documents | |
| 2. Extracts text using best available method (PyMuPDF → PDFPlumber → OCR) | |
| 3. Smart chunking with medical section awareness | |
| 4. Batch embedding generation | |
| 5. FAISS index creation | |
| 6. Metadata generation (citations, sources, quality scores) | |
| 7. Automatic Hugging Face Hub upload | |
| 8. Configuration file generation | |
| Usage: | |
| python scripts/build_vector_store.py \\ | |
| --input-dir ./Obs \\ | |
| --output-dir ./data/vector_store \\ | |
| --repo-id sniro23/VedaMD-Vector-Store \\ | |
| --upload | |
| Author: VedaMD Team | |
| Date: October 22, 2025 | |
| Version: 1.0.0 | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import hashlib | |
| import logging | |
| import argparse | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple, Optional | |
| from datetime import datetime | |
| import warnings | |
| # PDF processing | |
| try: | |
| import fitz # PyMuPDF | |
| HAS_PYMUPDF = True | |
| except ImportError: | |
| HAS_PYMUPDF = False | |
| warnings.warn("PyMuPDF not available. Install with: pip install PyMuPDF") | |
| try: | |
| import pdfplumber | |
| HAS_PDFPLUMBER = True | |
| except ImportError: | |
| HAS_PDFPLUMBER = False | |
| warnings.warn("pdfplumber not available. Install with: pip install pdfplumber") | |
| # Embeddings and vector store | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| HAS_EMBEDDINGS = True | |
| except ImportError: | |
| HAS_EMBEDDINGS = False | |
| raise ImportError("Required packages not installed. Run: pip install sentence-transformers faiss-cpu numpy") | |
| # Hugging Face Hub | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| HAS_HF = True | |
| except ImportError: | |
| HAS_HF = False | |
| warnings.warn("Hugging Face Hub not available. Install with: pip install huggingface-hub") | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler('vector_store_build.log') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class PDFExtractor: | |
| """Handles PDF text extraction with multiple fallback methods""" | |
| def extract_with_pymupdf(pdf_path: str) -> Tuple[str, Dict]: | |
| """Extract text using PyMuPDF (fastest, most reliable)""" | |
| if not HAS_PYMUPDF: | |
| raise ImportError("PyMuPDF not available") | |
| logger.info(f"📄 Extracting with PyMuPDF: {pdf_path}") | |
| text = "" | |
| metadata = {"method": "pymupdf", "pages": 0} | |
| try: | |
| doc = fitz.open(pdf_path) | |
| metadata["pages"] = len(doc) | |
| metadata["title"] = doc.metadata.get("title", "") | |
| metadata["author"] = doc.metadata.get("author", "") | |
| for page_num, page in enumerate(doc, 1): | |
| page_text = page.get_text() | |
| text += f"\n--- Page {page_num} ---\n{page_text}" | |
| doc.close() | |
| logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages") | |
| return text, metadata | |
| except Exception as e: | |
| logger.error(f"❌ PyMuPDF extraction failed: {e}") | |
| raise | |
| def extract_with_pdfplumber(pdf_path: str) -> Tuple[str, Dict]: | |
| """Extract text using pdfplumber (better table handling)""" | |
| if not HAS_PDFPLUMBER: | |
| raise ImportError("pdfplumber not available") | |
| logger.info(f"📄 Extracting with pdfplumber: {pdf_path}") | |
| text = "" | |
| metadata = {"method": "pdfplumber", "pages": 0} | |
| try: | |
| with pdfplumber.open(pdf_path) as pdf: | |
| metadata["pages"] = len(pdf.pages) | |
| for page_num, page in enumerate(pdf.pages, 1): | |
| page_text = page.extract_text() or "" | |
| text += f"\n--- Page {page_num} ---\n{page_text}" | |
| logger.info(f"✅ Extracted {len(text)} characters from {metadata['pages']} pages") | |
| return text, metadata | |
| except Exception as e: | |
| logger.error(f"❌ pdfplumber extraction failed: {e}") | |
| raise | |
| def extract_text(pdf_path: str) -> Tuple[str, Dict]: | |
| """Extract text using best available method with fallbacks""" | |
| errors = [] | |
| # Try PyMuPDF first (fastest) | |
| if HAS_PYMUPDF: | |
| try: | |
| return PDFExtractor.extract_with_pymupdf(pdf_path) | |
| except Exception as e: | |
| errors.append(f"PyMuPDF: {e}") | |
| logger.warning(f"⚠️ PyMuPDF failed, trying pdfplumber...") | |
| # Fallback to pdfplumber | |
| if HAS_PDFPLUMBER: | |
| try: | |
| return PDFExtractor.extract_with_pdfplumber(pdf_path) | |
| except Exception as e: | |
| errors.append(f"pdfplumber: {e}") | |
| logger.warning(f"⚠️ pdfplumber failed") | |
| # If all methods fail | |
| raise Exception(f"All extraction methods failed: {'; '.join(errors)}") | |
| class MedicalChunker: | |
| """Smart chunking with medical section awareness""" | |
| def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 100): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| # Medical section headers to preserve | |
| self.section_markers = [ | |
| "INTRODUCTION", "BACKGROUND", "DEFINITION", "EPIDEMIOLOGY", | |
| "PATHOPHYSIOLOGY", "CLINICAL FEATURES", "DIAGNOSIS", "MANAGEMENT", | |
| "TREATMENT", "PREVENTION", "COMPLICATIONS", "PROGNOSIS", | |
| "REFERENCES", "GUIDELINES", "PROTOCOL", "RECOMMENDATIONS" | |
| ] | |
| def chunk_text(self, text: str, source: str) -> List[Dict]: | |
| """Split text into chunks while preserving medical sections""" | |
| logger.info(f"📝 Chunking text from {source}") | |
| # Clean text | |
| text = text.strip() | |
| if not text: | |
| logger.warning(f"⚠️ Empty text from {source}") | |
| return [] | |
| chunks = [] | |
| current_chunk = "" | |
| current_section = "General" | |
| # Split by paragraphs | |
| paragraphs = text.split('\n\n') | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| # Check if paragraph is a section header | |
| para_upper = para.upper() | |
| for marker in self.section_markers: | |
| if marker in para_upper and len(para) < 100: | |
| current_section = para | |
| break | |
| # Add paragraph to current chunk | |
| if len(current_chunk) + len(para) + 2 <= self.chunk_size: | |
| current_chunk += f"\n\n{para}" | |
| else: | |
| # Save current chunk | |
| if current_chunk.strip(): | |
| chunks.append({ | |
| "content": current_chunk.strip(), | |
| "source": source, | |
| "section": current_section, | |
| "size": len(current_chunk) | |
| }) | |
| # Start new chunk with overlap | |
| if self.chunk_overlap > 0: | |
| # Keep last few sentences for context | |
| sentences = current_chunk.split('. ') | |
| overlap_text = '. '.join(sentences[-2:]) if len(sentences) > 1 else "" | |
| current_chunk = f"{overlap_text}\n\n{para}" | |
| else: | |
| current_chunk = para | |
| # Add final chunk | |
| if current_chunk.strip(): | |
| chunks.append({ | |
| "content": current_chunk.strip(), | |
| "source": source, | |
| "section": current_section, | |
| "size": len(current_chunk) | |
| }) | |
| logger.info(f"✅ Created {len(chunks)} chunks from {source}") | |
| return chunks | |
| class VectorStoreBuilder: | |
| """Main vector store builder class""" | |
| def __init__( | |
| self, | |
| input_dir: str, | |
| output_dir: str, | |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 100 | |
| ): | |
| self.input_dir = Path(input_dir) | |
| self.output_dir = Path(output_dir) | |
| self.embedding_model_name = embedding_model | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| # Create output directory | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Initialize components | |
| logger.info(f"🔧 Initializing vector store builder...") | |
| logger.info(f"📁 Input directory: {self.input_dir}") | |
| logger.info(f"📁 Output directory: {self.output_dir}") | |
| # Load embedding model | |
| logger.info(f"🤖 Loading embedding model: {self.embedding_model_name}") | |
| self.embedding_model = SentenceTransformer(self.embedding_model_name) | |
| self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() | |
| logger.info(f"✅ Embedding dimension: {self.embedding_dim}") | |
| # Initialize chunker | |
| self.chunker = MedicalChunker(chunk_size, chunk_overlap) | |
| # Storage | |
| self.documents = [] | |
| self.embeddings = [] | |
| self.metadata = [] | |
| def scan_pdfs(self) -> List[Path]: | |
| """Scan input directory for PDF files""" | |
| logger.info(f"🔍 Scanning for PDFs in {self.input_dir}") | |
| if not self.input_dir.exists(): | |
| raise FileNotFoundError(f"Input directory not found: {self.input_dir}") | |
| pdf_files = list(self.input_dir.glob("**/*.pdf")) | |
| logger.info(f"✅ Found {len(pdf_files)} PDF files") | |
| for pdf in pdf_files: | |
| logger.info(f" 📄 {pdf.name}") | |
| return pdf_files | |
| def process_pdf(self, pdf_path: Path) -> int: | |
| """Process a single PDF file""" | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"📄 Processing: {pdf_path.name}") | |
| logger.info(f"{'='*60}") | |
| try: | |
| # Extract text | |
| text, extraction_metadata = PDFExtractor.extract_text(str(pdf_path)) | |
| if not text or len(text) < 100: | |
| logger.warning(f"⚠️ Extracted text too short ({len(text)} chars), skipping") | |
| return 0 | |
| # Generate file hash for duplicate detection | |
| file_hash = hashlib.md5(text.encode()).hexdigest() | |
| # Chunk text | |
| chunks = self.chunker.chunk_text(text, pdf_path.name) | |
| if not chunks: | |
| logger.warning(f"⚠️ No chunks created from {pdf_path.name}") | |
| return 0 | |
| # Generate embeddings | |
| logger.info(f"🧮 Generating embeddings for {len(chunks)} chunks...") | |
| chunk_texts = [chunk["content"] for chunk in chunks] | |
| chunk_embeddings = self.embedding_model.encode( | |
| chunk_texts, | |
| show_progress_bar=True, | |
| batch_size=32 | |
| ) | |
| # Store documents and embeddings | |
| for i, (chunk, embedding) in enumerate(zip(chunks, chunk_embeddings)): | |
| self.documents.append(chunk["content"]) | |
| self.embeddings.append(embedding) | |
| self.metadata.append({ | |
| "source": pdf_path.name, | |
| "section": chunk["section"], | |
| "chunk_id": i, | |
| "chunk_size": chunk["size"], | |
| "file_hash": file_hash, | |
| "extraction_method": extraction_metadata["method"], | |
| "total_pages": extraction_metadata["pages"], | |
| "processed_at": datetime.now().isoformat() | |
| }) | |
| logger.info(f"✅ Processed {pdf_path.name}: {len(chunks)} chunks added") | |
| return len(chunks) | |
| except Exception as e: | |
| logger.error(f"❌ Error processing {pdf_path.name}: {e}") | |
| return 0 | |
| def build_faiss_index(self): | |
| """Build FAISS index from embeddings""" | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"🏗️ Building FAISS index...") | |
| logger.info(f"{'='*60}") | |
| if not self.embeddings: | |
| raise ValueError("No embeddings to index") | |
| # Convert to numpy array | |
| embeddings_array = np.array(self.embeddings).astype('float32') | |
| logger.info(f"📊 Embeddings shape: {embeddings_array.shape}") | |
| # Create FAISS index (L2 distance) | |
| index = faiss.IndexFlatL2(self.embedding_dim) | |
| # Add embeddings | |
| index.add(embeddings_array) | |
| logger.info(f"✅ FAISS index created with {index.ntotal} vectors") | |
| return index | |
| def save_vector_store(self, index): | |
| """Save vector store to disk""" | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"💾 Saving vector store...") | |
| logger.info(f"{'='*60}") | |
| # Save FAISS index | |
| index_path = self.output_dir / "faiss_index.bin" | |
| faiss.write_index(index, str(index_path)) | |
| logger.info(f"✅ Saved FAISS index: {index_path}") | |
| # Save documents | |
| docs_path = self.output_dir / "documents.json" | |
| with open(docs_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.documents, f, ensure_ascii=False, indent=2) | |
| logger.info(f"✅ Saved documents: {docs_path}") | |
| # Save metadata | |
| metadata_path = self.output_dir / "metadata.json" | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.metadata, f, ensure_ascii=False, indent=2) | |
| logger.info(f"✅ Saved metadata: {metadata_path}") | |
| # Save configuration | |
| config = { | |
| "embedding_model": self.embedding_model_name, | |
| "embedding_dim": self.embedding_dim, | |
| "chunk_size": self.chunk_size, | |
| "chunk_overlap": self.chunk_overlap, | |
| "total_documents": len(self.documents), | |
| "total_chunks": len(self.documents), | |
| "build_date": datetime.now().isoformat(), | |
| "version": "1.0.0" | |
| } | |
| config_path = self.output_dir / "config.json" | |
| with open(config_path, 'w', encoding='utf-8') as f: | |
| json.dump(config, f, indent=2) | |
| logger.info(f"✅ Saved config: {config_path}") | |
| # Save build log | |
| log_data = { | |
| "build_date": datetime.now().isoformat(), | |
| "input_dir": str(self.input_dir), | |
| "output_dir": str(self.output_dir), | |
| "total_pdfs": len(set(m["source"] for m in self.metadata)), | |
| "total_chunks": len(self.documents), | |
| "sources": list(set(m["source"] for m in self.metadata)), | |
| "config": config | |
| } | |
| log_path = self.output_dir / "build_log.json" | |
| with open(log_path, 'w', encoding='utf-8') as f: | |
| json.dump(log_data, f, indent=2) | |
| logger.info(f"✅ Saved build log: {log_path}") | |
| def upload_to_hf(self, repo_id: str, token: Optional[str] = None): | |
| """Upload vector store to Hugging Face Hub""" | |
| if not HAS_HF: | |
| logger.warning("⚠️ Hugging Face Hub not available, skipping upload") | |
| return | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"☁️ Uploading to Hugging Face Hub...") | |
| logger.info(f"📦 Repository: {repo_id}") | |
| logger.info(f"{'='*60}") | |
| try: | |
| api = HfApi(token=token) | |
| # Create repo if it doesn't exist | |
| try: | |
| create_repo(repo_id, repo_type="dataset", exist_ok=True, token=token) | |
| logger.info(f"✅ Repository ready: {repo_id}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Repo creation: {e}") | |
| # Upload all files | |
| files_to_upload = [ | |
| "faiss_index.bin", | |
| "documents.json", | |
| "metadata.json", | |
| "config.json", | |
| "build_log.json" | |
| ] | |
| for filename in files_to_upload: | |
| file_path = self.output_dir / filename | |
| if file_path.exists(): | |
| logger.info(f"📤 Uploading {filename}...") | |
| api.upload_file( | |
| path_or_fileobj=str(file_path), | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| logger.info(f"✅ Uploaded {filename}") | |
| logger.info(f"🎉 Upload complete! View at: https://huggingface.co/datasets/{repo_id}") | |
| except Exception as e: | |
| logger.error(f"❌ Upload failed: {e}") | |
| raise | |
| def build(self, upload: bool = False, repo_id: Optional[str] = None, hf_token: Optional[str] = None): | |
| """Main build process""" | |
| start_time = datetime.now() | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"🚀 STARTING VECTOR STORE BUILD") | |
| logger.info(f"{'='*60}\n") | |
| try: | |
| # Scan for PDFs | |
| pdf_files = self.scan_pdfs() | |
| if not pdf_files: | |
| raise ValueError("No PDF files found in input directory") | |
| # Process each PDF | |
| total_chunks = 0 | |
| for pdf_path in pdf_files: | |
| chunks_added = self.process_pdf(pdf_path) | |
| total_chunks += chunks_added | |
| if total_chunks == 0: | |
| raise ValueError("No chunks created from any PDF") | |
| # Build FAISS index | |
| index = self.build_faiss_index() | |
| # Save to disk | |
| self.save_vector_store(index) | |
| # Upload to HF if requested | |
| if upload and repo_id: | |
| self.upload_to_hf(repo_id, hf_token) | |
| # Summary | |
| duration = (datetime.now() - start_time).total_seconds() | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"✅ BUILD COMPLETE!") | |
| logger.info(f"{'='*60}") | |
| logger.info(f"📊 Summary:") | |
| logger.info(f" • PDFs processed: {len(pdf_files)}") | |
| logger.info(f" • Total chunks: {total_chunks}") | |
| logger.info(f" • Embedding dimension: {self.embedding_dim}") | |
| logger.info(f" • Output directory: {self.output_dir}") | |
| logger.info(f" • Build time: {duration:.2f} seconds") | |
| logger.info(f"{'='*60}\n") | |
| return True | |
| except Exception as e: | |
| logger.error(f"\n{'='*60}") | |
| logger.error(f"❌ BUILD FAILED: {e}") | |
| logger.error(f"{'='*60}\n") | |
| raise | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Build VedaMD Vector Store from PDF documents", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Build locally | |
| python scripts/build_vector_store.py --input-dir ./Obs --output-dir ./data/vector_store | |
| # Build and upload to HF | |
| python scripts/build_vector_store.py \\ | |
| --input-dir ./Obs \\ | |
| --output-dir ./data/vector_store \\ | |
| --repo-id sniro23/VedaMD-Vector-Store \\ | |
| --upload | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--input-dir", | |
| type=str, | |
| required=True, | |
| help="Directory containing PDF files" | |
| ) | |
| parser.add_argument( | |
| "--output-dir", | |
| type=str, | |
| default="./data/vector_store", | |
| help="Output directory for vector store files" | |
| ) | |
| parser.add_argument( | |
| "--embedding-model", | |
| type=str, | |
| default="sentence-transformers/all-MiniLM-L6-v2", | |
| help="Sentence transformer model for embeddings" | |
| ) | |
| parser.add_argument( | |
| "--chunk-size", | |
| type=int, | |
| default=1000, | |
| help="Maximum chunk size in characters" | |
| ) | |
| parser.add_argument( | |
| "--chunk-overlap", | |
| type=int, | |
| default=100, | |
| help="Overlap between chunks in characters" | |
| ) | |
| parser.add_argument( | |
| "--upload", | |
| action="store_true", | |
| help="Upload to Hugging Face Hub after building" | |
| ) | |
| parser.add_argument( | |
| "--repo-id", | |
| type=str, | |
| help="Hugging Face repository ID (e.g., username/repo-name)" | |
| ) | |
| parser.add_argument( | |
| "--hf-token", | |
| type=str, | |
| help="Hugging Face API token (or set HF_TOKEN env var)" | |
| ) | |
| args = parser.parse_args() | |
| # Get HF token from env if not provided | |
| hf_token = args.hf_token or os.getenv("HF_TOKEN") | |
| # Validate upload arguments | |
| if args.upload and not args.repo_id: | |
| parser.error("--repo-id is required when --upload is specified") | |
| # Build vector store | |
| builder = VectorStoreBuilder( | |
| input_dir=args.input_dir, | |
| output_dir=args.output_dir, | |
| embedding_model=args.embedding_model, | |
| chunk_size=args.chunk_size, | |
| chunk_overlap=args.chunk_overlap | |
| ) | |
| builder.build( | |
| upload=args.upload, | |
| repo_id=args.repo_id, | |
| hf_token=hf_token | |
| ) | |
| if __name__ == "__main__": | |
| main() | |