| """ |
| Embedding generation supporting multiple model backends. |
| |
| This module provides efficient batch embedding generation with automatic |
| model loading, caching, and device management. Supports both SentenceTransformers |
| models and NVIDIA NV-Embed-v2. |
| """ |
|
|
| import numpy as np |
| import torch |
| from typing import List, Optional |
| from tqdm import tqdm |
| from src.config.settings import get_settings, get_embedding_model_config, EMBEDDING_MODELS |
| from src.utils.logging import get_logger, log_embedding_generation |
| from src.ingestion.models import Chunk |
| import time |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class Embedder: |
| """Generate embeddings using SentenceTransformers or NV-Embed-v2.""" |
|
|
| |
| NVEMBED_MODELS = ["nvidia/NV-Embed-v2", "nvidia/NV-Embed-v1"] |
|
|
| def __init__(self, model_name: Optional[str] = None): |
| """ |
| Initialize embedder with specified or default model. |
| |
| Args: |
| model_name: Optional model identifier. If None, uses settings default. |
| """ |
| settings = get_settings() |
| self.model_name = model_name or settings.embedding_model |
| self.device = settings.embedding_device |
|
|
| |
| try: |
| model_config = get_embedding_model_config(self.model_name) |
| self.batch_size = model_config.get("batch_size", settings.embedding_batch_size) |
| self._dimensions = model_config.get("dimensions") |
| self._max_length = model_config.get("max_length", 512) |
| except ValueError: |
| |
| self.batch_size = settings.embedding_batch_size |
| self._dimensions = None |
| self._max_length = 512 |
|
|
| self._model = None |
| self._tokenizer = None |
| self._is_nvembed = self.model_name in self.NVEMBED_MODELS |
|
|
| @property |
| def model(self): |
| """ |
| Lazy load the embedding model. |
| |
| The model is only loaded when first accessed, and then cached for reuse. |
| |
| Returns: |
| Model instance (SentenceTransformer or transformers model) |
| """ |
| if self._model is None: |
| logger.info(f"Loading embedding model: {self.model_name}") |
|
|
| if self._is_nvembed: |
| self._load_nvembed_model() |
| else: |
| self._load_sentence_transformer() |
|
|
| logger.info(f"Model loaded on device: {self.device}") |
| return self._model |
|
|
| def _load_sentence_transformer(self): |
| """Load a SentenceTransformer model.""" |
| from sentence_transformers import SentenceTransformer |
| self._model = SentenceTransformer(self.model_name) |
| self._model.to(self.device) |
|
|
| def _load_nvembed_model(self): |
| """Load NVIDIA NV-Embed-v2 model.""" |
| from transformers import AutoModel, AutoTokenizer |
|
|
| logger.info("Loading NV-Embed-v2 (this may take a moment)...") |
|
|
| |
| if self.device == "mps": |
| |
| torch_dtype = torch.float32 |
| elif self.device == "cuda": |
| torch_dtype = torch.float16 |
| else: |
| torch_dtype = torch.float32 |
|
|
| self._tokenizer = AutoTokenizer.from_pretrained( |
| self.model_name, |
| trust_remote_code=True |
| ) |
|
|
| self._model = AutoModel.from_pretrained( |
| self.model_name, |
| trust_remote_code=True, |
| torch_dtype=torch_dtype, |
| ) |
| self._model.to(self.device) |
| self._model.eval() |
|
|
| def _nvembed_encode( |
| self, |
| texts: List[str], |
| instruction: str = "", |
| max_length: Optional[int] = None, |
| ) -> np.ndarray: |
| """ |
| Encode texts using NV-Embed-v2's native encode method. |
| |
| Args: |
| texts: List of texts to encode |
| instruction: Instruction prefix for queries (empty for documents) |
| max_length: Maximum sequence length (uses model config if None) |
| |
| Returns: |
| np.ndarray: Embeddings array |
| """ |
| if max_length is None: |
| max_length = self._max_length |
|
|
| all_embeddings = [] |
|
|
| for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding"): |
| batch_texts = texts[i:i + self.batch_size] |
|
|
| |
| with torch.no_grad(): |
| if instruction: |
| |
| embeddings = self._model.encode( |
| batch_texts, |
| instruction=instruction, |
| max_length=max_length, |
| ) |
| else: |
| |
| embeddings = self._model.encode( |
| batch_texts, |
| max_length=max_length, |
| ) |
|
|
| |
| if isinstance(embeddings, torch.Tensor): |
| embeddings = embeddings.cpu().numpy() |
|
|
| all_embeddings.append(embeddings) |
|
|
| return np.vstack(all_embeddings) |
|
|
| def encode_batch(self, chunks: List[Chunk]) -> np.ndarray: |
| """ |
| Generate embeddings for a batch of chunks (documents). |
| |
| Processes chunks in smaller batches for memory efficiency and |
| displays progress with tqdm. |
| |
| Args: |
| chunks: List of chunks to embed |
| |
| Returns: |
| np.ndarray: Array of embeddings with shape (num_chunks, embedding_dim) |
| """ |
| if not chunks: |
| logger.warning("No chunks to embed") |
| return np.array([]) |
|
|
| start_time = time.time() |
|
|
| |
| texts = [chunk.text for chunk in chunks] |
|
|
| logger.info(f"Generating embeddings for {len(chunks)} chunks") |
|
|
| if self._is_nvembed: |
| |
| _ = self.model |
| embeddings = self._nvembed_encode(texts, instruction="") |
| else: |
| |
| embeddings = [] |
| for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding chunks"): |
| batch_texts = texts[i:i + self.batch_size] |
|
|
| batch_embeddings = self.model.encode( |
| batch_texts, |
| batch_size=self.batch_size, |
| show_progress_bar=False, |
| convert_to_numpy=True, |
| normalize_embeddings=True |
| ) |
|
|
| embeddings.append(batch_embeddings) |
|
|
| embeddings = np.vstack(embeddings) |
|
|
| |
| duration = time.time() - start_time |
| log_embedding_generation(logger, len(chunks), duration) |
|
|
| return embeddings |
|
|
| def encode_single(self, text: str, is_query: bool = False) -> np.ndarray: |
| """ |
| Generate embedding for a single text. |
| |
| Args: |
| text: Text to embed |
| is_query: If True, applies query instruction (for NV-Embed) |
| |
| Returns: |
| np.ndarray: Embedding vector |
| """ |
| if self._is_nvembed: |
| _ = self.model |
| |
| instruction = ( |
| "Instruct: Given a question, retrieve passages that answer the question\nQuery: " |
| if is_query else "" |
| ) |
| embeddings = self._nvembed_encode([text], instruction=instruction) |
| return embeddings[0] |
| else: |
| embedding = self.model.encode( |
| text, |
| convert_to_numpy=True, |
| normalize_embeddings=True |
| ) |
| return embedding |
|
|
| def get_embedding_dimension(self) -> int: |
| """ |
| Get the dimension of embeddings produced by this model. |
| |
| Returns: |
| int: Embedding dimension |
| """ |
| |
| if self._dimensions: |
| return self._dimensions |
|
|
| |
| _ = self.model |
|
|
| if self._is_nvembed: |
| return 4096 |
| else: |
| return self._model.get_sentence_embedding_dimension() |
|
|
| def get_model_info(self) -> dict: |
| """ |
| Get information about the current embedding model. |
| |
| Returns: |
| dict: Model information including name, dimensions, etc. |
| """ |
| try: |
| config = get_embedding_model_config(self.model_name) |
| return { |
| "id": self.model_name, |
| "name": config.get("name", self.model_name), |
| "dimensions": self.get_embedding_dimension(), |
| "type": config.get("type", "unknown"), |
| "description": config.get("description", ""), |
| } |
| except ValueError: |
| return { |
| "id": self.model_name, |
| "name": self.model_name.split("/")[-1], |
| "dimensions": self.get_embedding_dimension(), |
| "type": "unknown", |
| "description": "Custom model", |
| } |
|
|