Spaces:
Sleeping
Sleeping
| """ANNOY index management for PDF-based RAG system.""" | |
| import os | |
| import json | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple, Union | |
| from pathlib import Path | |
| import logging | |
| try: | |
| from annoy import AnnoyIndex | |
| except ImportError: | |
| raise ImportError("annoy package is required. Install with: pip install annoy") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AnnoyIndexManager: | |
| """Manages ANNOY indices for fast vector similarity search.""" | |
| def __init__(self, embedding_dim: int = 1024, metric: str = 'angular'): | |
| """ | |
| Initialize ANNOY index manager. | |
| Args: | |
| embedding_dim: Dimension of embeddings (1024 for BGE Large Medical) | |
| metric: Distance metric ('angular' for cosine similarity, 'euclidean', 'manhattan', 'hamming', 'dot') | |
| """ | |
| self.embedding_dim = embedding_dim | |
| self.metric = metric | |
| self.tag_index = None | |
| self.chunk_index = None | |
| self.tag_to_id_mapping = {} | |
| self.id_to_tag_mapping = {} | |
| self.chunk_to_id_mapping = {} | |
| self.id_to_chunk_mapping = {} | |
| logger.info(f"Initialized AnnoyIndexManager: dim={embedding_dim}, metric={metric}") | |
| def build_tag_index(self, tag_embeddings: Dict[str, np.ndarray], n_trees: int = 50) -> AnnoyIndex: | |
| """ | |
| Build ANNOY index for tag embeddings. | |
| Args: | |
| tag_embeddings: Dictionary mapping tags to their embeddings | |
| n_trees: Number of trees (more trees = better precision, slower build) | |
| Returns: | |
| Built ANNOY index | |
| """ | |
| logger.info(f"Building tag ANNOY index with {len(tag_embeddings)} tags...") | |
| # Create index | |
| self.tag_index = AnnoyIndex(self.embedding_dim, self.metric) | |
| # Create mappings | |
| self.tag_to_id_mapping = {} | |
| self.id_to_tag_mapping = {} | |
| # Add embeddings to index | |
| for tag_id, (tag, embedding) in enumerate(tag_embeddings.items()): | |
| self.tag_index.add_item(tag_id, embedding) | |
| self.tag_to_id_mapping[tag] = tag_id | |
| self.id_to_tag_mapping[tag_id] = tag | |
| # Build index | |
| logger.info(f"Building index with {n_trees} trees...") | |
| self.tag_index.build(n_trees) | |
| logger.info(f"✅ Tag ANNOY index built successfully: {len(tag_embeddings)} tags") | |
| return self.tag_index | |
| def build_chunk_index(self, chunk_embeddings: Dict[str, List[Dict]], n_trees: int = 50) -> AnnoyIndex: | |
| """ | |
| Build ANNOY index for chunk embeddings. | |
| Args: | |
| chunk_embeddings: Dictionary mapping document names to lists of chunk dictionaries | |
| n_trees: Number of trees | |
| Returns: | |
| Built ANNOY index | |
| """ | |
| # Count total chunks | |
| total_chunks = sum(len(chunks) for chunks in chunk_embeddings.values()) | |
| logger.info(f"Building chunk ANNOY index with {total_chunks} chunks...") | |
| # Create index | |
| self.chunk_index = AnnoyIndex(self.embedding_dim, self.metric) | |
| # Create mappings | |
| self.chunk_to_id_mapping = {} | |
| self.id_to_chunk_mapping = {} | |
| chunk_id = 0 | |
| for doc_name, chunks in chunk_embeddings.items(): | |
| for chunk in chunks: | |
| # Create unique chunk identifier | |
| chunk_key = f"{doc_name}#{chunk['chunk_id']}" | |
| # Add to index | |
| self.chunk_index.add_item(chunk_id, chunk['embedding']) | |
| # Create mappings | |
| self.chunk_to_id_mapping[chunk_key] = chunk_id | |
| self.id_to_chunk_mapping[chunk_id] = { | |
| 'document': doc_name, | |
| 'chunk_id': chunk['chunk_id'], | |
| 'text': chunk['text'], | |
| 'start_char': chunk.get('start_char', 0), | |
| 'end_char': chunk.get('end_char', len(chunk['text'])), | |
| 'token_count': chunk.get('token_count', len(chunk['text'].split())), | |
| 'chunk_key': chunk_key | |
| } | |
| chunk_id += 1 | |
| # Build index | |
| logger.info(f"Building chunk index with {n_trees} trees...") | |
| self.chunk_index.build(n_trees) | |
| logger.info(f"✅ Chunk ANNOY index built successfully: {total_chunks} chunks") | |
| return self.chunk_index | |
| def save_indices(self, output_dir: Union[str, Path]): | |
| """ | |
| Save ANNOY indices and mappings to disk. | |
| Args: | |
| output_dir: Directory to save indices | |
| """ | |
| output_dir = Path(output_dir) | |
| # Save indices at the same level as embeddings, not inside embeddings | |
| indices_dir = output_dir.parent / 'indices' | |
| indices_dir.mkdir(exist_ok=True) | |
| # Save tag index | |
| if self.tag_index is not None: | |
| tag_index_path = indices_dir / 'tag_embeddings.ann' | |
| self.tag_index.save(str(tag_index_path)) | |
| # Save tag mappings | |
| tag_mappings_path = indices_dir / 'tag_mappings.json' | |
| with open(tag_mappings_path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'tag_to_id': self.tag_to_id_mapping, | |
| 'id_to_tag': self.id_to_tag_mapping | |
| }, f, indent=2, ensure_ascii=False) | |
| logger.info(f"✅ Tag index saved: {tag_index_path}") | |
| # Save chunk index | |
| if self.chunk_index is not None: | |
| chunk_index_path = indices_dir / 'chunk_embeddings.ann' | |
| self.chunk_index.save(str(chunk_index_path)) | |
| # Save chunk mappings | |
| chunk_mappings_path = indices_dir / 'chunk_mappings.json' | |
| with open(chunk_mappings_path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'chunk_to_id': self.chunk_to_id_mapping, | |
| 'id_to_chunk': self.id_to_chunk_mapping | |
| }, f, indent=2, ensure_ascii=False) | |
| logger.info(f"✅ Chunk index saved: {chunk_index_path}") | |
| # Save index metadata | |
| metadata_path = indices_dir / 'annoy_metadata.json' | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'embedding_dim': self.embedding_dim, | |
| 'metric': self.metric, | |
| 'tag_index_exists': self.tag_index is not None, | |
| 'chunk_index_exists': self.chunk_index is not None, | |
| 'num_tags': len(self.tag_to_id_mapping), | |
| 'num_chunks': len(self.chunk_to_id_mapping) | |
| }, f, indent=2) | |
| logger.info(f"✅ ANNOY indices saved to: {indices_dir}") | |
| def load_indices(self, input_dir: Union[str, Path]) -> bool: | |
| """ | |
| Load ANNOY indices and mappings from disk. | |
| Args: | |
| input_dir: Directory containing saved indices | |
| Returns: | |
| True if successfully loaded, False otherwise | |
| """ | |
| input_dir = Path(input_dir) | |
| # Load indices from the same level as embeddings, not inside embeddings | |
| indices_dir = input_dir.parent / 'indices' | |
| if not indices_dir.exists(): | |
| logger.warning(f"Indices directory not found: {indices_dir}") | |
| return False | |
| try: | |
| # Load metadata | |
| metadata_path = indices_dir / 'annoy_metadata.json' | |
| if metadata_path.exists(): | |
| with open(metadata_path, 'r', encoding='utf-8') as f: | |
| metadata = json.load(f) | |
| self.embedding_dim = metadata['embedding_dim'] | |
| self.metric = metadata['metric'] | |
| logger.info(f"Loaded metadata: dim={self.embedding_dim}, metric={self.metric}") | |
| # Load tag index | |
| tag_index_path = indices_dir / 'tag_embeddings.ann' | |
| tag_mappings_path = indices_dir / 'tag_mappings.json' | |
| if tag_index_path.exists() and tag_mappings_path.exists(): | |
| self.tag_index = AnnoyIndex(self.embedding_dim, self.metric) | |
| self.tag_index.load(str(tag_index_path)) | |
| with open(tag_mappings_path, 'r', encoding='utf-8') as f: | |
| mappings = json.load(f) | |
| self.tag_to_id_mapping = mappings['tag_to_id'] | |
| self.id_to_tag_mapping = {int(k): v for k, v in mappings['id_to_tag'].items()} | |
| logger.info(f"✅ Tag index loaded: {len(self.tag_to_id_mapping)} tags") | |
| # Load chunk index | |
| chunk_index_path = indices_dir / 'chunk_embeddings.ann' | |
| chunk_mappings_path = indices_dir / 'chunk_mappings.json' | |
| if chunk_index_path.exists() and chunk_mappings_path.exists(): | |
| self.chunk_index = AnnoyIndex(self.embedding_dim, self.metric) | |
| self.chunk_index.load(str(chunk_index_path)) | |
| with open(chunk_mappings_path, 'r', encoding='utf-8') as f: | |
| mappings = json.load(f) | |
| self.chunk_to_id_mapping = mappings['chunk_to_id'] | |
| self.id_to_chunk_mapping = {int(k): v for k, v in mappings['id_to_chunk'].items()} | |
| logger.info(f"✅ Chunk index loaded: {len(self.chunk_to_id_mapping)} chunks") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load ANNOY indices: {e}") | |
| return False | |
| def search_tags(self, query_embedding: np.ndarray, n_neighbors: int = 10, | |
| include_distances: bool = True) -> Union[List[str], Tuple[List[str], List[float]]]: | |
| """ | |
| Search for similar tags using ANNOY index. | |
| Args: | |
| query_embedding: Query embedding vector | |
| n_neighbors: Number of nearest neighbors to return | |
| include_distances: Whether to return distances | |
| Returns: | |
| List of tag names, or tuple of (tag_names, distances) | |
| """ | |
| if self.tag_index is None: | |
| raise ValueError("Tag index not built or loaded") | |
| # Search using ANNOY | |
| if include_distances: | |
| neighbor_ids, distances = self.tag_index.get_nns_by_vector( | |
| query_embedding, n_neighbors, include_distances=True | |
| ) | |
| else: | |
| neighbor_ids = self.tag_index.get_nns_by_vector( | |
| query_embedding, n_neighbors, include_distances=False | |
| ) | |
| # Convert IDs to tag names | |
| tag_names = [self.id_to_tag_mapping[neighbor_id] for neighbor_id in neighbor_ids] | |
| if include_distances: | |
| return tag_names, distances | |
| else: | |
| return tag_names | |
| def search_chunks(self, query_embedding: np.ndarray, n_neighbors: int = 10, | |
| include_distances: bool = True) -> Union[List[Dict], Tuple[List[Dict], List[float]]]: | |
| """ | |
| Search for similar chunks using ANNOY index. | |
| Args: | |
| query_embedding: Query embedding vector | |
| n_neighbors: Number of nearest neighbors to return | |
| include_distances: Whether to return distances | |
| Returns: | |
| List of chunk dictionaries, or tuple of (chunks, distances) | |
| """ | |
| if self.chunk_index is None: | |
| raise ValueError("Chunk index not built or loaded") | |
| # Search using ANNOY | |
| if include_distances: | |
| neighbor_ids, distances = self.chunk_index.get_nns_by_vector( | |
| query_embedding, n_neighbors, include_distances=True | |
| ) | |
| else: | |
| neighbor_ids = self.chunk_index.get_nns_by_vector( | |
| query_embedding, n_neighbors, include_distances=False | |
| ) | |
| # Convert IDs to chunk info | |
| chunks = [self.id_to_chunk_mapping[neighbor_id] for neighbor_id in neighbor_ids] | |
| if include_distances: | |
| return chunks, distances | |
| else: | |
| return chunks | |
| def search_chunks_in_documents(self, query_embedding: np.ndarray, | |
| document_names: List[str], n_neighbors: int = 10, | |
| include_distances: bool = True) -> Union[List[Dict], Tuple[List[Dict], List[float]]]: | |
| """ | |
| Search for similar chunks within specific documents. | |
| Args: | |
| query_embedding: Query embedding vector | |
| document_names: List of document names to search within | |
| n_neighbors: Number of nearest neighbors to return | |
| include_distances: Whether to return distances | |
| Returns: | |
| List of chunk dictionaries, or tuple of (chunks, distances) | |
| """ | |
| if self.chunk_index is None: | |
| raise ValueError("Chunk index not built or loaded") | |
| # Get more candidates than needed since we'll filter by document | |
| search_candidates = min(n_neighbors * 5, len(self.id_to_chunk_mapping)) | |
| # Search using ANNOY | |
| if include_distances: | |
| candidate_ids, distances = self.chunk_index.get_nns_by_vector( | |
| query_embedding, search_candidates, include_distances=True | |
| ) | |
| else: | |
| candidate_ids = self.chunk_index.get_nns_by_vector( | |
| query_embedding, search_candidates, include_distances=False | |
| ) | |
| # Filter by document names and take top n_neighbors | |
| filtered_chunks = [] | |
| filtered_distances = [] if include_distances else None | |
| for i, candidate_id in enumerate(candidate_ids): | |
| chunk_info = self.id_to_chunk_mapping[candidate_id] | |
| if chunk_info['document'] in document_names: | |
| filtered_chunks.append(chunk_info) | |
| if include_distances: | |
| filtered_distances.append(distances[i]) | |
| if len(filtered_chunks) >= n_neighbors: | |
| break | |
| if include_distances: | |
| return filtered_chunks, filtered_distances | |
| else: | |
| return filtered_chunks | |
| def get_index_stats(self) -> Dict: | |
| """Get statistics about the loaded indices.""" | |
| stats = { | |
| 'embedding_dim': self.embedding_dim, | |
| 'metric': self.metric, | |
| 'tag_index_loaded': self.tag_index is not None, | |
| 'chunk_index_loaded': self.chunk_index is not None, | |
| 'num_tags': len(self.tag_to_id_mapping) if self.tag_index else 0, | |
| 'num_chunks': len(self.chunk_to_id_mapping) if self.chunk_index else 0 | |
| } | |
| return stats | |
| def convert_angular_distance_to_cosine_similarity(angular_distance: float) -> float: | |
| """ | |
| Convert ANNOY angular distance to cosine similarity. | |
| Args: | |
| angular_distance: Angular distance from ANNOY | |
| Returns: | |
| Cosine similarity (0 to 1) | |
| """ | |
| # Angular distance is related to cosine similarity by: | |
| # angular_distance = 2 * arccos(cosine_similarity) / π | |
| # Therefore: cosine_similarity = cos(angular_distance * π / 2) | |
| import math | |
| return math.cos(angular_distance * math.pi / 2) |