Spaces:
Sleeping
Sleeping
| """Context retrieval with reranking capabilities.""" | |
| import os | |
| from typing import List, Optional, Tuple, Dict, Any | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from sentence_transformers import CrossEncoder | |
| import numpy as np | |
| import torch | |
| from qdrant_client.http import models as rest | |
| import traceback | |
| from .filter import create_filter | |
| class ContextRetriever: | |
| """ | |
| Context retriever for hybrid search with optional filtering and reranking. | |
| """ | |
| def __init__(self, vectorstore: Qdrant, config: dict = None): | |
| """ | |
| Initialize the context retriever. | |
| Args: | |
| vectorstore: Qdrant vector store instance | |
| config: Configuration dictionary | |
| """ | |
| self.vectorstore = vectorstore | |
| self.config = config or {} | |
| self.reranker = None | |
| # BM25 attributes | |
| self.bm25_vectorizer = None | |
| self.bm25_matrix = None | |
| self.bm25_documents = None | |
| # Initialize reranker if available | |
| # Try to get reranker model from different config paths | |
| self.reranker_model_name = ( | |
| config.get('retrieval', {}).get('reranker_model') or | |
| config.get('ranker', {}).get('model') or | |
| config.get('reranker_model') or | |
| 'BAAI/bge-reranker-v2-m3' | |
| ) | |
| self.reranker_type = self._detect_reranker_type(self.reranker_model_name) | |
| try: | |
| if self.reranker_type == 'colbert': | |
| from colbert.infra import Run, ColBERTConfig | |
| from colbert.modeling.checkpoint import Checkpoint | |
| # ColBERT uses late interaction - different implementation needed | |
| print(f"✅ RERANKER: ColBERT model detected ({self.reranker_model_name})") | |
| print(f"🔍 INTERACTION TYPE: Late interaction (token-level embeddings)") | |
| # Create ColBERT config for CPU mode | |
| colbert_config = ColBERTConfig( | |
| doc_maxlen=300, | |
| query_maxlen=32, | |
| nbits=2, | |
| kmeans_niters=4, | |
| root="./colbert_data" | |
| ) | |
| # Load checkpoint (e.g. "colbert-ir/colbertv2.0") | |
| self.colbert_checkpoint = Checkpoint(self.reranker_model_name, colbert_config=colbert_config) | |
| self.colbert_model = self.colbert_checkpoint.model | |
| self.colbert_tokenizer = self.colbert_checkpoint.raw_tokenizer | |
| self.reranker = self._colbert_rerank # attach wrapper function | |
| print(f"✅ COLBERT: Model and tokenizer loaded successfully") | |
| else: | |
| # Standard CrossEncoder for BGE and other models | |
| from sentence_transformers import CrossEncoder | |
| self.reranker = CrossEncoder(self.reranker_model_name) | |
| print(f"✅ RERANKER: Initialized {self.reranker_model_name}") | |
| print(f"🔍 INTERACTION TYPE: Cross-encoder (single relevance score)") | |
| except Exception as e: | |
| print(f"⚠️ Reranker initialization failed: {e}") | |
| self.reranker = None | |
| def _detect_reranker_type(self, model_name: str) -> str: | |
| """ | |
| Detect the type of reranker based on model name. | |
| Args: | |
| model_name: Name of the reranker model | |
| Returns: | |
| 'colbert' for ColBERT models, 'crossencoder' for others | |
| """ | |
| model_name_lower = model_name.lower() | |
| # ColBERT model patterns | |
| colbert_patterns = [ | |
| 'colbert', | |
| 'colbert-ir', | |
| 'colbertv2', | |
| 'colbert-v2' | |
| ] | |
| for pattern in colbert_patterns: | |
| if pattern in model_name_lower: | |
| return 'colbert' | |
| # Default to cross-encoder for BGE and other models | |
| return 'crossencoder' | |
| def _similarity_search_with_colbert_embeddings(self, query: str, k: int = 5, **kwargs) -> List[Tuple[Document, float]]: | |
| """ | |
| Perform similarity search and fetch ColBERT embeddings for documents. | |
| Args: | |
| query: Search query | |
| k: Number of documents to retrieve | |
| **kwargs: Additional search parameters (filter, etc.) | |
| Returns: | |
| List of (Document, score) tuples with ColBERT embeddings in metadata | |
| """ | |
| try: | |
| print(f"🔍 COLBERT RETRIEVAL: Fetching documents with ColBERT embeddings") | |
| # Use the vectorstore's similarity_search_with_score method instead of direct client | |
| # This ensures proper filter handling | |
| if 'filter' in kwargs and kwargs['filter']: | |
| # Use the vectorstore method with filter | |
| result = self.vectorstore.similarity_search_with_score( | |
| query, | |
| k=k, | |
| filter=kwargs['filter'] | |
| ) | |
| else: | |
| # Use the vectorstore method without filter | |
| result = self.vectorstore.similarity_search_with_score(query, k=k) | |
| # Convert to the format we need | |
| if isinstance(result, tuple) and len(result) == 2: | |
| documents, scores = result | |
| elif isinstance(result, list): | |
| documents = [] | |
| scores = [] | |
| for item in result: | |
| if isinstance(item, tuple) and len(item) == 2: | |
| doc, score = item | |
| documents.append(doc) | |
| scores.append(score) | |
| else: | |
| documents.append(item) | |
| scores.append(0.0) | |
| else: | |
| documents = [] | |
| scores = [] | |
| # Now we need to fetch the ColBERT embeddings for these documents | |
| # We'll use the Qdrant client directly for this part since we need specific payload fields | |
| from qdrant_client.http import models as rest | |
| collection_name = self.vectorstore.collection_name | |
| # Get document IDs from the retrieved documents | |
| doc_ids = [] | |
| for doc in documents: | |
| # Extract ID from document metadata or use page_content hash as fallback | |
| doc_id = doc.metadata.get('id') or doc.metadata.get('_id') | |
| if not doc_id: | |
| # Use a hash of the content as ID | |
| import hashlib | |
| doc_id = hashlib.md5(doc.page_content.encode()).hexdigest() | |
| doc_ids.append(doc_id) | |
| # Fetch documents with ColBERT embeddings from Qdrant | |
| search_result = self.vectorstore.client.retrieve( | |
| collection_name=collection_name, | |
| ids=doc_ids, | |
| with_payload=True, | |
| with_vectors=False | |
| ) | |
| # Convert results to Document objects with ColBERT embeddings | |
| enhanced_documents = [] | |
| enhanced_scores = [] | |
| # Create a mapping from doc_id to original score | |
| doc_id_to_score = {} | |
| for i, doc in enumerate(documents): | |
| doc_id = doc.metadata.get('id') or doc.metadata.get('_id') | |
| if not doc_id: | |
| import hashlib | |
| doc_id = hashlib.md5(doc.page_content.encode()).hexdigest() | |
| doc_id_to_score[doc_id] = scores[i] | |
| for point in search_result: | |
| # Extract payload | |
| payload = point.payload | |
| # Get the original score for this document | |
| doc_id = str(point.id) | |
| original_score = doc_id_to_score.get(doc_id, 0.0) | |
| # Create Document object with ColBERT embeddings | |
| doc = Document( | |
| page_content=payload.get('page_content', ''), | |
| metadata={ | |
| **payload.get('metadata', {}), | |
| 'colbert_embedding': payload.get('colbert_embedding'), | |
| 'colbert_model': payload.get('colbert_model'), | |
| 'colbert_calculated_at': payload.get('colbert_calculated_at') | |
| } | |
| ) | |
| enhanced_documents.append(doc) | |
| enhanced_scores.append(original_score) | |
| print(f"✅ COLBERT RETRIEVAL: Retrieved {len(enhanced_documents)} documents with ColBERT embeddings") | |
| return list(zip(enhanced_documents, enhanced_scores)) | |
| except Exception as e: | |
| print(f"❌ COLBERT RETRIEVAL ERROR: {e}") | |
| print(f"❌ Falling back to regular similarity search") | |
| # Fallback to regular search - handle filter parameter correctly | |
| if 'filter' in kwargs and kwargs['filter']: | |
| return self.vectorstore.similarity_search_with_score(query, k=k, filter=kwargs['filter']) | |
| else: | |
| return self.vectorstore.similarity_search_with_score(query, k=k) | |
| def retrieve_context( | |
| self, | |
| query: str, | |
| k: int = 5, | |
| reports: Optional[List[str]] = None, | |
| sources: Optional[List[str]] = None, | |
| subtype: Optional[str] = None, | |
| year: Optional[str] = None, | |
| district: Optional[List[str]] = None, | |
| filenames: Optional[List[str]] = None, | |
| use_reranking: bool = False, | |
| qdrant_filter: Optional[rest.Filter] = None | |
| ) -> List[Document]: | |
| """ | |
| Retrieve context documents using hybrid search with optional filtering and reranking. | |
| Args: | |
| query: User query | |
| top_k: Number of documents to retrieve | |
| reports: List of report names to filter by | |
| sources: List of sources to filter by | |
| subtype: Document subtype to filter by | |
| year: Year to filter by | |
| use_reranking: Whether to apply reranking | |
| qdrant_filter: Pre-built Qdrant filter to use | |
| Returns: | |
| List of retrieved documents | |
| """ | |
| try: | |
| # Determine how many documents to retrieve | |
| retrieve_k = k #* 3 if use_reranking else k # Retrieve more for reranking | |
| # Build search kwargs | |
| search_kwargs = {} | |
| # Use qdrant_filter if provided (this takes precedence) | |
| if qdrant_filter: | |
| search_kwargs = {"filter": qdrant_filter} | |
| print(f"✅ FILTERS APPLIED: Using inferred Qdrant filter") | |
| else: | |
| # Build filter from individual parameters | |
| filter_obj = create_filter( | |
| reports=reports, | |
| sources=sources, | |
| subtype=subtype, | |
| year=year, | |
| district=district, | |
| filenames=filenames | |
| ) | |
| if filter_obj: | |
| search_kwargs = {"filter": filter_obj} | |
| print(f"✅ FILTERS APPLIED: Using built filter") | |
| else: | |
| search_kwargs = {} | |
| print(f"⚠️ NO FILTERS APPLIED: All documents will be searched") | |
| # Perform vector search | |
| try: | |
| # Check if we need ColBERT embeddings for reranking | |
| if use_reranking and self.reranker_type == 'colbert': | |
| result = self._similarity_search_with_colbert_embeddings( | |
| query, | |
| k=retrieve_k, | |
| **search_kwargs | |
| ) | |
| else: | |
| result = self.vectorstore.similarity_search_with_score( | |
| query, | |
| k=retrieve_k, | |
| **search_kwargs | |
| ) | |
| # Handle different return formats | |
| if isinstance(result, tuple) and len(result) == 2: | |
| documents, scores = result | |
| elif isinstance(result, list) and len(result) > 0: | |
| # Handle case where result is a list of (Document, score) tuples | |
| documents = [] | |
| scores = [] | |
| for item in result: | |
| if isinstance(item, tuple) and len(item) == 2: | |
| doc, score = item | |
| documents.append(doc) | |
| scores.append(score) | |
| else: | |
| # Handle case where item is just a Document | |
| documents.append(item) | |
| scores.append(0.0) # Default score | |
| else: | |
| documents = [] | |
| scores = [] | |
| print(f"✅ RETRIEVAL SUCCESS: Retrieved {len(documents)} documents (requested: {retrieve_k})") | |
| # If we got fewer documents than requested, try without filters | |
| if len(documents) < retrieve_k and search_kwargs.get('filter'): | |
| print(f"⚠️ RETRIEVAL: Got {len(documents)} docs with filters, trying without filters...") | |
| try: | |
| result_no_filter = self.vectorstore.similarity_search_with_score( | |
| query, | |
| k=retrieve_k | |
| ) | |
| if isinstance(result_no_filter, tuple) and len(result_no_filter) == 2: | |
| documents_no_filter, scores_no_filter = result_no_filter | |
| elif isinstance(result_no_filter, list): | |
| documents_no_filter = [] | |
| scores_no_filter = [] | |
| for item in result_no_filter: | |
| if isinstance(item, tuple) and len(item) == 2: | |
| doc, score = item | |
| documents_no_filter.append(doc) | |
| scores_no_filter.append(score) | |
| else: | |
| documents_no_filter.append(item) | |
| scores_no_filter.append(0.0) | |
| else: | |
| documents_no_filter = [] | |
| scores_no_filter = [] | |
| if len(documents_no_filter) > len(documents): | |
| print(f"✅ RETRIEVAL: Got {len(documents_no_filter)} docs without filters") | |
| documents = documents_no_filter | |
| scores = scores_no_filter | |
| except Exception as e: | |
| print(f"⚠️ RETRIEVAL: Fallback search failed: {e}") | |
| except Exception as e: | |
| print(f"❌ RETRIEVAL ERROR: {str(e)}") | |
| return [] | |
| # Apply reranking if enabled | |
| reranking_applied = False | |
| if use_reranking and len(documents) > 1: | |
| print(f"🔄 RERANKING: Applying {self.reranker_model_name} to {len(documents)} documents...") | |
| try: | |
| original_docs = documents.copy() | |
| original_scores = scores.copy() | |
| # Apply reranking | |
| # print(f"🔍 ORIGINAL DOCS: {documents[0]}") | |
| reranked_docs = self._apply_reranking(query, documents, scores) | |
| # print(f"🔍 RERANKED DOCS: {reranked_docs[0]}") | |
| reranking_applied = len(reranked_docs) > 0 | |
| if reranking_applied: | |
| print(f"✅ RERANKING APPLIED: {self.reranker_model_name}") | |
| documents = reranked_docs | |
| # Update scores to reflect reranking | |
| # scores = [0.0] * len(documents) # Reranked scores are not directly comparable | |
| else: | |
| print(f"⚠️ RERANKING FAILED: Using original order") | |
| documents = original_docs | |
| scores = original_scores | |
| return documents | |
| except Exception as e: | |
| print(f"❌ RERANKING ERROR: {str(e)}") | |
| print(f"⚠️ RERANKING FAILED: Using original order") | |
| reranking_applied = False | |
| elif use_reranking and len(documents) <= 1: | |
| print(f"ℹ️ RERANKING: Skipped (only {len(documents)} document(s) retrieved)") | |
| if use_reranking: | |
| print(f"ℹ️ RERANKING: Skipped (disabled or insufficient documents)") | |
| # Store original scores in metadata | |
| for i, (doc, score) in enumerate(zip(documents, scores)): | |
| doc.metadata['original_score'] = float(score) | |
| doc.metadata['reranking_applied'] = False | |
| return documents | |
| else: | |
| print(f"ℹ️ RERANKING: Skipped (disabled or insufficient documents)") | |
| # Limit to requested number of documents | |
| documents = documents[:k] | |
| scores = scores[:k] if scores else [0.0] * len(documents) | |
| # Add metadata to documents | |
| for i, (doc, score) in enumerate(zip(documents, scores)): | |
| if hasattr(doc, 'metadata'): | |
| doc.metadata.update({ | |
| 'reranking_applied': reranking_applied, | |
| 'reranker_model': 'BAAI/bge-reranker-v2-m3' if reranking_applied else None, | |
| 'original_rank': i + 1, | |
| 'final_rank': i + 1, | |
| 'original_score': float(score) if score is not None else 0.0 | |
| }) | |
| return documents | |
| except Exception as e: | |
| print(f"❌ CONTEXT RETRIEVAL ERROR: {str(e)}") | |
| return [] | |
| def _apply_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]: | |
| """ | |
| Apply reranking to documents using the appropriate reranker. | |
| Args: | |
| query: User query | |
| documents: List of documents to rerank | |
| scores: Original scores | |
| Returns: | |
| Reranked list of documents | |
| """ | |
| if not self.reranker or len(documents) == 0: | |
| return documents | |
| try: | |
| print(f"🔍 RERANKING METHOD: Starting reranking with {len(documents)} documents") | |
| print(f"🔍 RERANKING TYPE: {self.reranker_type.upper()}") | |
| if self.reranker_type == 'colbert': | |
| return self._apply_colbert_reranking(query, documents, scores) | |
| else: | |
| return self._apply_crossencoder_reranking(query, documents, scores) | |
| except Exception as e: | |
| print(f"❌ RERANKING ERROR: {str(e)}") | |
| return documents | |
| def _apply_crossencoder_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]: | |
| """ | |
| Apply reranking using CrossEncoder (BGE and other models). | |
| Args: | |
| query: User query | |
| documents: List of documents to rerank | |
| scores: Original scores | |
| Returns: | |
| Reranked list of documents | |
| """ | |
| # Prepare pairs for reranking | |
| pairs = [] | |
| for doc in documents: | |
| pairs.append([query, doc.page_content]) | |
| print(f"🔍 CROSS-ENCODER: Prepared {len(pairs)} pairs for reranking") | |
| # Get reranking scores using the correct CrossEncoder API | |
| rerank_scores = self.reranker.predict(pairs) | |
| # Handle single score case | |
| if not isinstance(rerank_scores, (list, np.ndarray)): | |
| rerank_scores = [rerank_scores] | |
| # Ensure we have the right number of scores | |
| if len(rerank_scores) != len(documents): | |
| print(f"⚠️ RERANKING WARNING: Expected {len(documents)} scores, got {len(rerank_scores)}") | |
| return documents | |
| print(f"🔍 CROSS-ENCODER: Got {len(rerank_scores)} rerank scores") | |
| print(f"🔍 CROSS-ENCODER SCORES: {rerank_scores[:5]}...") # Show first 5 scores | |
| # Combine documents with their rerank scores | |
| doc_scores = list(zip(documents, rerank_scores)) | |
| # Sort by rerank score (descending) | |
| doc_scores.sort(key=lambda x: x[1], reverse=True) | |
| # Extract reranked documents and store scores in metadata | |
| reranked_docs = [] | |
| for i, (doc, rerank_score) in enumerate(doc_scores): | |
| # Find original index for original score | |
| original_idx = documents.index(doc) | |
| original_score = scores[original_idx] if original_idx < len(scores) else 0.0 | |
| # Create new document with reranking metadata | |
| new_doc = Document( | |
| page_content=doc.page_content, | |
| metadata={ | |
| **doc.metadata, | |
| 'reranking_applied': True, | |
| 'reranker_model': self.reranker_model_name, | |
| 'reranker_type': self.reranker_type, | |
| 'original_rank': original_idx + 1, | |
| 'final_rank': i + 1, | |
| 'original_score': float(original_score), | |
| 'reranked_score': float(rerank_score) | |
| } | |
| ) | |
| reranked_docs.append(new_doc) | |
| print(f"✅ CROSS-ENCODER: Reranked {len(reranked_docs)} documents") | |
| return reranked_docs | |
| def _apply_colbert_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]: | |
| """ | |
| Apply reranking using ColBERT late interaction. | |
| Args: | |
| query: User query | |
| documents: List of documents to rerank | |
| scores: Original scores | |
| Returns: | |
| Reranked list of documents | |
| """ | |
| # Use the actual ColBERT reranking implementation | |
| return self._colbert_rerank(query, documents, scores) | |
| def _colbert_rerank(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]: | |
| """ | |
| ColBERT reranking using late interaction with pre-calculated embeddings support. | |
| Args: | |
| query: User query | |
| documents: List of documents to rerank | |
| scores: Original scores | |
| Returns: | |
| Reranked list of documents | |
| """ | |
| try: | |
| print(f"🔍 COLBERT: Starting late interaction reranking with {len(documents)} documents") | |
| # Check if documents have pre-calculated ColBERT embeddings | |
| pre_calculated_embeddings = [] | |
| documents_without_embeddings = [] | |
| documents_without_indices = [] | |
| for i, doc in enumerate(documents): | |
| if (hasattr(doc, 'metadata') and | |
| 'colbert_embedding' in doc.metadata and | |
| doc.metadata['colbert_embedding'] is not None): | |
| # Use pre-calculated embedding | |
| colbert_embedding = doc.metadata['colbert_embedding'] | |
| if isinstance(colbert_embedding, list): | |
| colbert_embedding = torch.tensor(colbert_embedding) | |
| pre_calculated_embeddings.append(colbert_embedding) | |
| else: | |
| # Need to calculate embedding | |
| documents_without_embeddings.append(doc) | |
| documents_without_indices.append(i) | |
| # Calculate query embedding | |
| query_embeddings = self.colbert_checkpoint.queryFromText([query]) | |
| # Calculate embeddings for documents without pre-calculated ones | |
| if documents_without_embeddings: | |
| print(f"🔄 COLBERT: Calculating embeddings for {len(documents_without_embeddings)} documents without pre-calculated embeddings") | |
| doc_texts = [doc.page_content for doc in documents_without_embeddings] | |
| doc_embeddings = self.colbert_checkpoint.docFromText(doc_texts) | |
| # Insert calculated embeddings into the right positions | |
| for i, embedding in enumerate(doc_embeddings): | |
| idx = documents_without_indices[i] | |
| pre_calculated_embeddings.insert(idx, embedding) | |
| else: | |
| print(f"✅ COLBERT: Using pre-calculated embeddings for all {len(documents)} documents") | |
| # Calculate late interaction scores | |
| # ColBERT uses MaxSim: for each query token, find max similarity with document tokens | |
| colbert_scores = [] | |
| for i, doc_embedding in enumerate(pre_calculated_embeddings): | |
| # Calculate similarity matrix between query and document i | |
| sim_matrix = torch.matmul(query_embeddings[0], doc_embedding.transpose(-1, -2)) | |
| # MaxSim: for each query token, take max similarity with document | |
| max_sim_per_query_token = torch.max(sim_matrix, dim=-1)[0] | |
| # Sum over query tokens to get final score | |
| final_score = torch.sum(max_sim_per_query_token).item() | |
| colbert_scores.append(final_score) | |
| # Sort documents by ColBERT scores | |
| doc_scores = list(zip(documents, colbert_scores)) | |
| doc_scores.sort(key=lambda x: x[1], reverse=True) | |
| # Create reranked documents with metadata | |
| reranked_docs = [] | |
| for i, (doc, colbert_score) in enumerate(doc_scores): | |
| original_idx = documents.index(doc) | |
| original_score = scores[original_idx] if original_idx < len(scores) else 0.0 | |
| new_doc = Document( | |
| page_content=doc.page_content, | |
| metadata={ | |
| **doc.metadata, | |
| 'reranking_applied': True, | |
| 'reranker_model': self.reranker_model_name, | |
| 'reranker_type': self.reranker_type, | |
| 'original_rank': original_idx + 1, | |
| 'final_rank': i + 1, | |
| 'original_score': float(original_score), | |
| 'reranked_score': float(colbert_score), | |
| 'colbert_score': float(colbert_score), | |
| 'colbert_embedding_pre_calculated': 'colbert_embedding' in doc.metadata | |
| } | |
| ) | |
| reranked_docs.append(new_doc) | |
| print(f"✅ COLBERT: Reranked {len(reranked_docs)} documents using late interaction") | |
| print(f"🔍 COLBERT SCORES: {[f'{score:.4f}' for score in colbert_scores[:5]]}...") | |
| return reranked_docs | |
| except Exception as e: | |
| print(f"❌ COLBERT RERANKING ERROR: {str(e)}") | |
| print(f"❌ COLBERT TRACEBACK: {traceback.format_exc()}") | |
| # Fallback to original order - return documents as-is | |
| return documents | |
| def retrieve_with_scores(self, query: str, vectorstore=None, k: int = 5, reports: List[str] = None, | |
| sources: List[str] = None, subtype: List[str] = None, | |
| year: List[str] = None, use_reranking: bool = False, | |
| qdrant_filter: Optional[rest.Filter] = None) -> Tuple[List[Document], List[float]]: | |
| """ | |
| Retrieve context documents with scores using hybrid search with optional reranking. | |
| Args: | |
| query: User query | |
| vectorstore: Optional vectorstore instance (for compatibility) | |
| k: Number of documents to retrieve | |
| reports: List of report names to filter by | |
| sources: List of sources to filter by | |
| subtype: Document subtype to filter by | |
| year: List of years to filter by | |
| use_reranking: Whether to apply reranking | |
| qdrant_filter: Pre-built Qdrant filter | |
| Returns: | |
| Tuple of (documents, scores) | |
| """ | |
| try: | |
| # Use the provided vectorstore if available, otherwise use the instance one | |
| if vectorstore: | |
| self.vectorstore = vectorstore | |
| # Determine search strategy | |
| search_strategy = self.config.get('retrieval', {}).get('search_strategy', 'vector_only') | |
| if search_strategy == 'vector_only': | |
| # Vector search only | |
| print(f"🔄 VECTOR SEARCH: Retrieving {k} documents...") | |
| if qdrant_filter: | |
| print(f"✅ QDRANT FILTER APPLIED: Using inferred Qdrant filter") | |
| # Pass filter as positional argument, not keyword argument | |
| results = self.vectorstore.similarity_search_with_score( | |
| query, | |
| k=k, | |
| filter=qdrant_filter | |
| ) | |
| else: | |
| # Build filter from individual parameters | |
| filter_conditions = self._build_filter_conditions(reports, sources, subtype, year) | |
| if filter_conditions: | |
| print(f"✅ FILTER APPLIED: {filter_conditions}") | |
| results = self.vectorstore.similarity_search_with_score( | |
| query, | |
| k=k, | |
| filter=filter_conditions | |
| ) | |
| else: | |
| print(f"ℹ️ NO FILTERS APPLIED: All documents will be searched") | |
| results = self.vectorstore.similarity_search_with_score(query, k=k) | |
| print(f"🔍 SEARCH DEBUG: Raw result type: {type(results)}") | |
| print(f"🔍 SEARCH DEBUG: Raw result length: {len(results)}") | |
| # Handle different result formats | |
| if results and isinstance(results[0], tuple): | |
| documents = [doc for doc, score in results] | |
| scores = [score for doc, score in results] | |
| print(f"🔍 SEARCH DEBUG: After unpacking - documents: {len(documents)}, scores: {len(scores)}") | |
| else: | |
| documents = results | |
| scores = [0.0] * len(documents) | |
| print(f"🔍 SEARCH DEBUG: No scores available, using default") | |
| print(f"🔧 CONVERTING: Converting {len(documents)} documents") | |
| # Convert to Document objects and store original scores | |
| final_documents = [] | |
| for i, (doc, score) in enumerate(zip(documents, scores)): | |
| if hasattr(doc, 'page_content'): | |
| new_doc = Document( | |
| page_content=doc.page_content, | |
| metadata=doc.metadata.copy() | |
| ) | |
| # Store original score in metadata | |
| new_doc.metadata['original_score'] = float(score) if score is not None else 0.0 | |
| final_documents.append(new_doc) | |
| else: | |
| print(f"⚠️ WARNING: Document {i} has no page_content") | |
| print(f"✅ RETRIEVAL SUCCESS: Retrieved {len(final_documents)} documents") | |
| # Apply reranking if enabled | |
| if use_reranking and len(final_documents) > 1: | |
| print(f"🔄 RERANKING: Applying {self.reranker_model} to {len(final_documents)} documents...") | |
| final_documents = self._apply_reranking(query, final_documents, scores) | |
| print(f"✅ RERANKING APPLIED: {self.reranker_model}") | |
| else: | |
| print(f"ℹ️ RERANKING: Skipped (disabled or no documents)") | |
| return final_documents, scores | |
| else: | |
| print(f"❌ UNSUPPORTED STRATEGY: {search_strategy}") | |
| return [], [] | |
| except Exception as e: | |
| print(f"❌ RETRIEVAL ERROR: {e}") | |
| print(f"❌ RETRIEVAL TRACEBACK: {traceback.format_exc()}") | |
| return [], [] | |
| def _build_filter_conditions(self, reports: List[str] = None, sources: List[str] = None, | |
| subtype: List[str] = None, year: List[str] = None) -> Optional[rest.Filter]: | |
| """ | |
| Build Qdrant filter conditions from individual parameters. | |
| Args: | |
| reports: List of report names | |
| sources: List of sources | |
| subtype: Document subtype | |
| year: List of years | |
| Returns: | |
| Qdrant filter or None | |
| """ | |
| conditions = [] | |
| if reports: | |
| conditions.append(rest.FieldCondition( | |
| key="metadata.filename", | |
| match=rest.MatchAny(any=reports) | |
| )) | |
| if sources: | |
| conditions.append(rest.FieldCondition( | |
| key="metadata.source", | |
| match=rest.MatchAny(any=sources) | |
| )) | |
| if subtype: | |
| conditions.append(rest.FieldCondition( | |
| key="metadata.subtype", | |
| match=rest.MatchAny(any=subtype) | |
| )) | |
| if year: | |
| conditions.append(rest.FieldCondition( | |
| key="metadata.year", | |
| match=rest.MatchAny(any=year) | |
| )) | |
| if conditions: | |
| return rest.Filter(must=conditions) | |
| return None | |
| def get_context( | |
| query: str, | |
| vectorstore: Qdrant, | |
| k: int = 5, | |
| reports: Optional[List[str]] = None, | |
| sources: Optional[List[str]] = None, | |
| subtype: Optional[str] = None, | |
| year: Optional[str] = None, | |
| use_reranking: bool = False, | |
| qdrant_filter: Optional[rest.Filter] = None | |
| ) -> List[Document]: | |
| """ | |
| Convenience function to get context documents. | |
| Args: | |
| query: User query | |
| vectorstore: Qdrant vector store instance | |
| k: Number of documents to retrieve | |
| reports: Optional list of report names to filter by | |
| sources: Optional list of source categories to filter by | |
| subtype: Optional subtype to filter by | |
| year: Optional year to filter by | |
| use_reranking: Whether to apply reranking | |
| qdrant_filter: Optional pre-built Qdrant filter | |
| Returns: | |
| List of retrieved documents | |
| """ | |
| retriever = ContextRetriever(vectorstore) | |
| return retriever.retrieve_context( | |
| query=query, | |
| k=k, | |
| reports=reports, | |
| sources=sources, | |
| subtype=subtype, | |
| year=year, | |
| use_reranking=use_reranking, | |
| qdrant_filter=qdrant_filter | |
| ) | |
| def format_context_for_llm(documents: List[Document]) -> str: | |
| """ | |
| Format retrieved documents for LLM input. | |
| Args: | |
| documents: List of Document objects | |
| Returns: | |
| Formatted string for LLM | |
| """ | |
| if not documents: | |
| return "" | |
| formatted_parts = [] | |
| for i, doc in enumerate(documents, 1): | |
| content = doc.page_content.strip() | |
| source = doc.metadata.get('filename', 'Unknown') | |
| formatted_parts.append(f"Document {i} (Source: {source}):\n{content}") | |
| return "\n\n".join(formatted_parts) | |
| def get_context_metadata(documents: List[Document]) -> Dict[str, Any]: | |
| """ | |
| Extract metadata summary from retrieved documents. | |
| Args: | |
| documents: List of Document objects | |
| Returns: | |
| Dictionary with metadata summary | |
| """ | |
| if not documents: | |
| return {} | |
| sources = set() | |
| years = set() | |
| doc_types = set() | |
| for doc in documents: | |
| metadata = doc.metadata | |
| if 'filename' in metadata: | |
| sources.add(metadata['filename']) | |
| if 'year' in metadata: | |
| years.add(metadata['year']) | |
| if 'source' in metadata: | |
| doc_types.add(metadata['source']) | |
| return { | |
| "num_documents": len(documents), | |
| "sources": list(sources), | |
| "years": list(years), | |
| "document_types": list(doc_types) | |
| } | |