import numpy as np from typing import List, Tuple, Dict, Any from tf_data_pipeline import TFDataPipeline from logger_config import config_logger logger = config_logger(__name__) class ResponseQualityChecker: """ The Response Quality Checker measures: - Relevance (embedding or cross-encoder) - Diversity among top responses - Length - Score gap - Confidence """ def __init__( self, data_pipeline: "TFDataPipeline", confidence_threshold: float = 0.45, diversity_threshold: float = 0.10, min_response_length: int = 5, similarity_cap: float = 0.90 ): """ Args: data_pipeline: TFDataPipeline for encoding confidence_threshold: Min top_score for 'confident' diversity_threshold: Min average diversity for top responses min_response_length: Min word count - 'valid length' similarity_cap: Cap pairwise similarity to reduce outliers """ self.data_pipeline = data_pipeline self.confidence_threshold = confidence_threshold self.diversity_threshold = diversity_threshold self.min_response_length = min_response_length self.similarity_cap = similarity_cap # Additional thresholds self.thresholds = { 'relevance': 0.30, 'length_score': 0.80, 'score_gap': 0.05 } def check_response_quality( self, query: str, responses: List[Tuple[str, float]] ) -> Dict[str, Any]: """ Evaluate the quality of top-k responses: - response_diversity - query_response_relevance - response_length_score - top_score - top_3_score_gap - is_confident """ if not responses: return { 'response_diversity': 0.0, 'query_response_relevance': 0.0, 'response_length_score': 0.0, 'top_score': 0.0, 'top_3_score_gap': 0.0, 'is_confident': False } metrics = {} metrics['response_diversity'] = self._calc_diversity(responses) metrics['query_response_relevance'] = self._calc_relevance(query, responses) metrics['response_length_score'] = self._calc_length_score(responses) metrics['top_score'] = responses[0][1] metrics['top_3_score_gap'] = self._calc_score_gap([score for _, score in responses]) metrics['is_confident'] = self._determine_confidence(metrics) return metrics def _calc_diversity(self, responses: List[Tuple[str, float]]) -> float: """ Average similarity among top response embeddings, capped by self.similarity_cap. """ if len(responses) < 2: return 1.0 # Single response texts = [r for r, _ in responses] embs = self.data_pipeline.encode_responses(texts) sim_matrix = self._cosine_similarity(embs, embs) # Zero out diagonal np.fill_diagonal(sim_matrix, 0.0) # Cap similarity sim_matrix = np.minimum(sim_matrix, self.similarity_cap) sum_sims = np.sum(sim_matrix) count = len(responses) * (len(responses) - 1) avg_sim = sum_sims / count if count > 0 else 0.0 return 1.0 - avg_sim def _calc_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: """ Weighted average of exponential-transformed similarities for top-k. Encourages a high similarity with the top responses. """ if not responses: return 0.0 query_emb = self.data_pipeline.encode_query(query) texts = [r for r, _ in responses] resp_embs = self.data_pipeline.encode_responses(texts) query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-8) norms = (np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-8) resp_embs = resp_embs / norms # Cosine similarity, then exponential transform sims = np.sum(query_emb[np.newaxis, :] * resp_embs, axis=1) # shape [k] sims = np.exp(sims - 1.0) # Weighted average to boost top responses weights = np.exp(-np.arange(len(responses)) / 2.0) weighted_avg = np.average(sims, weights=weights) return float(weighted_avg) def _calc_length_score(self, responses: List[Tuple[str, float]]) -> float: """ Average length-based score across top responses. """ scores = [] for text, _ in responses: words = len(text.strip().split()) if words < self.min_response_length: # Penalty for too short s = words / float(self.min_response_length) elif words > 50: # Penalty for excessive length s = max(0.5, 50.0 / words) else: s = 1.0 scores.append(s) return float(np.mean(scores)) if scores else 0.0 def _calc_score_gap(self, scores: List[float], top_n: int = 3) -> float: """ Average difference between consecutive ranks for top_n. """ if len(scores) < 2: return 0.0 top_n = min(top_n, len(scores)) gaps = [] for i in range(top_n - 1): gaps.append(scores[i] - scores[i + 1]) return float(np.mean(gaps)) if gaps else 0.0 def _determine_confidence(self, m: Dict[str, float]) -> bool: """ Require: - top_score >= self.confidence_threshold - response_diversity >= self.diversity_threshold - response_length_score >= self.thresholds['length_score'] Secondary conditions (2 of 3 required): - query_response_relevance >= self.thresholds['relevance'] - top_3_score_gap >= self.thresholds['score_gap'] - top_score >= (confidence_threshold + 0.05) """ primary = [ m['top_score'] >= self.confidence_threshold, m['response_diversity'] >= self.diversity_threshold, m['response_length_score'] >= self.thresholds['length_score'] ] secondary = [ m['query_response_relevance'] >= self.thresholds['relevance'], m['top_3_score_gap'] >= self.thresholds['score_gap'], m['top_score'] >= (self.confidence_threshold + 0.05) ] if all(primary) and sum(secondary) >= 2: return True return False def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: """Manual cosine sim matrix: a-> shape [N, d], b-> shape [M, d]. Return shape [N, M].""" a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-8) b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-8) return np.dot(a_norm, b_norm.T)