Spaces:
Running
Running
| """ | |
| User Feedback Tracking System | |
| Tracks user feedback on search results for continuous improvement: | |
| - Thumbs up/down on answers | |
| - Relevance ratings on sources | |
| - Intent classification accuracy | |
| - Search strategy effectiveness | |
| Stores feedback in ClickHouse for analysis and model improvement. | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from dataclasses import dataclass, asdict | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class FeedbackEvent: | |
| """User feedback event""" | |
| # Identifiers | |
| session_id: str | |
| query_id: str | |
| user_id: Optional[int] | |
| # Query info | |
| query: str | |
| expanded_query: Optional[str] | |
| # Classification info | |
| intent_classified: str | |
| intent_confidence: float | |
| intent_method: str | |
| # Search info | |
| search_strategy: str | |
| live_results_count: int | |
| db_results_count: int | |
| total_sources: int | |
| # Feedback | |
| feedback_type: str # "thumbs_up", "thumbs_down", "source_rating", "intent_correction" | |
| feedback_value: Any # True/False for thumbs, 1-5 for rating, corrected intent for correction | |
| feedback_comment: Optional[str] | |
| # Metadata | |
| timestamp: str | |
| response_time_ms: float | |
| cache_hit: bool | |
| class FeedbackTracker: | |
| """ | |
| Track and store user feedback for continuous improvement. | |
| Features: | |
| - Multiple feedback types (thumbs, ratings, corrections) | |
| - ClickHouse storage for analytics | |
| - Async logging (non-blocking) | |
| - Aggregation and reporting | |
| """ | |
| def __init__(self, analytics_db=None): | |
| """ | |
| Initialize feedback tracker. | |
| Args: | |
| analytics_db: ClickHouse analytics database adapter | |
| """ | |
| self.analytics_db = analytics_db | |
| self._ensure_table_exists() | |
| def _ensure_table_exists(self): | |
| """Create feedback table if it doesn't exist""" | |
| if not self.analytics_db: | |
| return | |
| try: | |
| create_table_query = """ | |
| CREATE TABLE IF NOT EXISTS user_feedback ( | |
| session_id String, | |
| query_id String, | |
| user_id Nullable(Int32), | |
| query String, | |
| expanded_query Nullable(String), | |
| intent_classified String, | |
| intent_confidence Float32, | |
| intent_method String, | |
| search_strategy String, | |
| live_results_count Int32, | |
| db_results_count Int32, | |
| total_sources Int32, | |
| feedback_type String, | |
| feedback_value String, | |
| feedback_comment Nullable(String), | |
| timestamp DateTime, | |
| response_time_ms Float32, | |
| cache_hit UInt8 | |
| ) ENGINE = MergeTree() | |
| ORDER BY (timestamp, session_id) | |
| """ | |
| self.analytics_db.execute(create_table_query) | |
| logger.info("β Feedback table ensured") | |
| except Exception as e: | |
| logger.error(f"Failed to create feedback table: {e}") | |
| def record_feedback( | |
| self, | |
| session_id: str, | |
| query: str, | |
| feedback_type: str, | |
| feedback_value: Any, | |
| query_metadata: Dict[str, Any], | |
| feedback_comment: Optional[str] = None, | |
| user_id: Optional[int] = None | |
| ): | |
| """ | |
| Record user feedback. | |
| Args: | |
| session_id: User session ID | |
| query: Original query | |
| feedback_type: Type of feedback (thumbs_up, thumbs_down, etc.) | |
| feedback_value: Feedback value | |
| query_metadata: Metadata about the query and response | |
| feedback_comment: Optional comment from user | |
| user_id: Optional user ID | |
| """ | |
| try: | |
| # Create feedback event | |
| event = FeedbackEvent( | |
| session_id=session_id, | |
| query_id=query_metadata.get("query_id", f"{session_id}_{int(datetime.utcnow().timestamp())}"), | |
| user_id=user_id, | |
| query=query, | |
| expanded_query=query_metadata.get("expanded_query"), | |
| intent_classified=query_metadata.get("intent", "UNKNOWN"), | |
| intent_confidence=query_metadata.get("intent_confidence", 0.0), | |
| intent_method=query_metadata.get("intent_method", "unknown"), | |
| search_strategy=query_metadata.get("search_strategy", "unknown"), | |
| live_results_count=query_metadata.get("live_results_count", 0), | |
| db_results_count=query_metadata.get("db_results_count", 0), | |
| total_sources=query_metadata.get("total_sources", 0), | |
| feedback_type=feedback_type, | |
| feedback_value=str(feedback_value), | |
| feedback_comment=feedback_comment, | |
| timestamp=datetime.utcnow().isoformat(), | |
| response_time_ms=query_metadata.get("response_time_ms", 0.0), | |
| cache_hit=query_metadata.get("cache_hit", False) | |
| ) | |
| # Store in ClickHouse | |
| if self.analytics_db: | |
| self._store_feedback(event) | |
| # Log feedback | |
| logger.info( | |
| f"Feedback recorded: {feedback_type}={feedback_value} " | |
| f"for query='{query}' (intent={event.intent_classified})" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to record feedback: {e}") | |
| def _store_feedback(self, event: FeedbackEvent): | |
| """Store feedback event in ClickHouse""" | |
| try: | |
| insert_query = """ | |
| INSERT INTO user_feedback ( | |
| session_id, query_id, user_id, | |
| query, expanded_query, | |
| intent_classified, intent_confidence, intent_method, | |
| search_strategy, live_results_count, db_results_count, total_sources, | |
| feedback_type, feedback_value, feedback_comment, | |
| timestamp, response_time_ms, cache_hit | |
| ) VALUES | |
| """ | |
| values = ( | |
| event.session_id, | |
| event.query_id, | |
| event.user_id, | |
| event.query, | |
| event.expanded_query, | |
| event.intent_classified, | |
| event.intent_confidence, | |
| event.intent_method, | |
| event.search_strategy, | |
| event.live_results_count, | |
| event.db_results_count, | |
| event.total_sources, | |
| event.feedback_type, | |
| event.feedback_value, | |
| event.feedback_comment, | |
| event.timestamp, | |
| event.response_time_ms, | |
| 1 if event.cache_hit else 0 | |
| ) | |
| self.analytics_db.execute(insert_query, [values]) | |
| except Exception as e: | |
| logger.error(f"Failed to store feedback in ClickHouse: {e}") | |
| def get_feedback_stats(self, days: int = 7) -> Dict[str, Any]: | |
| """ | |
| Get feedback statistics for the last N days. | |
| Args: | |
| days: Number of days to analyze | |
| Returns: | |
| Dictionary with feedback statistics | |
| """ | |
| if not self.analytics_db: | |
| return {} | |
| try: | |
| query = f""" | |
| SELECT | |
| feedback_type, | |
| COUNT(*) as count, | |
| AVG(intent_confidence) as avg_confidence, | |
| AVG(response_time_ms) as avg_response_time, | |
| SUM(cache_hit) / COUNT(*) as cache_hit_rate | |
| FROM user_feedback | |
| WHERE timestamp >= now() - INTERVAL {days} DAY | |
| GROUP BY feedback_type | |
| ORDER BY count DESC | |
| """ | |
| results = self.analytics_db.query(query) | |
| stats = { | |
| "total_feedback": sum(r["count"] for r in results), | |
| "by_type": { | |
| r["feedback_type"]: { | |
| "count": r["count"], | |
| "avg_confidence": r["avg_confidence"], | |
| "avg_response_time": r["avg_response_time"], | |
| "cache_hit_rate": r["cache_hit_rate"] | |
| } | |
| for r in results | |
| }, | |
| "period_days": days | |
| } | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Failed to get feedback stats: {e}") | |
| return {} | |
| def get_intent_accuracy(self, days: int = 7) -> Dict[str, Any]: | |
| """ | |
| Get intent classification accuracy based on user corrections. | |
| Args: | |
| days: Number of days to analyze | |
| Returns: | |
| Dictionary with accuracy metrics | |
| """ | |
| if not self.analytics_db: | |
| return {} | |
| try: | |
| query = f""" | |
| SELECT | |
| intent_classified, | |
| COUNT(*) as total, | |
| SUM(CASE WHEN feedback_type = 'intent_correction' THEN 1 ELSE 0 END) as corrections, | |
| AVG(intent_confidence) as avg_confidence | |
| FROM user_feedback | |
| WHERE timestamp >= now() - INTERVAL {days} DAY | |
| GROUP BY intent_classified | |
| ORDER BY total DESC | |
| """ | |
| results = self.analytics_db.query(query) | |
| accuracy = { | |
| "by_intent": { | |
| r["intent_classified"]: { | |
| "total": r["total"], | |
| "corrections": r["corrections"], | |
| "accuracy": 1.0 - (r["corrections"] / r["total"]) if r["total"] > 0 else 0.0, | |
| "avg_confidence": r["avg_confidence"] | |
| } | |
| for r in results | |
| }, | |
| "period_days": days | |
| } | |
| return accuracy | |
| except Exception as e: | |
| logger.error(f"Failed to get intent accuracy: {e}") | |
| return {} | |
| def get_low_confidence_queries(self, threshold: float = 0.7, limit: int = 100) -> List[Dict[str, Any]]: | |
| """ | |
| Get queries with low intent classification confidence. | |
| Args: | |
| threshold: Confidence threshold (queries below this) | |
| limit: Maximum number of queries to return | |
| Returns: | |
| List of low-confidence queries | |
| """ | |
| if not self.analytics_db: | |
| return [] | |
| try: | |
| query = f""" | |
| SELECT | |
| query, | |
| intent_classified, | |
| intent_confidence, | |
| intent_method, | |
| COUNT(*) as occurrences | |
| FROM user_feedback | |
| WHERE intent_confidence < {threshold} | |
| GROUP BY query, intent_classified, intent_confidence, intent_method | |
| ORDER BY occurrences DESC, intent_confidence ASC | |
| LIMIT {limit} | |
| """ | |
| results = self.analytics_db.query(query) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Failed to get low confidence queries: {e}") | |
| return [] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SINGLETON INSTANCE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Will be initialized with dependencies in main.py | |
| feedback_tracker: Optional[FeedbackTracker] = None | |
| def initialize_feedback_tracker(analytics_db=None): | |
| """Initialize global feedback tracker instance""" | |
| global feedback_tracker | |
| feedback_tracker = FeedbackTracker(analytics_db) | |
| logger.info("Feedback tracker initialized") | |