Spaces:
Paused
Paused
| # models/sentiment/sentiment_utils.py | |
| """ | |
| Sentiment Analysis Model Utilities for PENNY Project | |
| Handles text sentiment classification for user input analysis and content moderation. | |
| Provides async sentiment analysis with structured error handling and logging. | |
| """ | |
| import asyncio | |
| import time | |
| import os | |
| import httpx | |
| from typing import Dict, Any, Optional, List | |
| # --- Logging Imports --- | |
| from app.logging_utils import log_interaction, sanitize_for_logging | |
| # --- Hugging Face API Configuration --- | |
| HF_API_URL = "https://api-inference.huggingface.co/models/cardiffnlp/twitter-roberta-base-sentiment" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| AGENT_NAME = "penny-sentiment-agent" | |
| def is_sentiment_available() -> bool: | |
| """ | |
| Check if sentiment analysis service is available. | |
| Returns: | |
| bool: True if sentiment API is configured and ready. | |
| """ | |
| return HF_TOKEN is not None and len(HF_TOKEN) > 0 | |
| async def get_sentiment_analysis( | |
| text: str, | |
| tenant_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Runs sentiment analysis on the input text using the loaded pipeline. | |
| Args: | |
| text: The string of text to analyze. | |
| tenant_id: Optional tenant identifier for logging. | |
| Returns: | |
| A dictionary containing: | |
| - label (str): Sentiment label (e.g., "POSITIVE", "NEGATIVE", "NEUTRAL") | |
| - score (float): Confidence score for the sentiment prediction | |
| - available (bool): Whether the service was available | |
| - message (str, optional): Error message if analysis failed | |
| - response_time_ms (int, optional): Analysis time in milliseconds | |
| """ | |
| start_time = time.time() | |
| # Check availability | |
| if not is_sentiment_available(): | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Sentiment API not configured (missing HF_TOKEN)", | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "UNKNOWN", | |
| "score": 0.0, | |
| "available": False, | |
| "message": "Sentiment analysis is temporarily unavailable." | |
| } | |
| # Validate input | |
| if not text or not isinstance(text, str): | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid text input" | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Invalid text input provided." | |
| } | |
| # Check text length (prevent processing extremely long texts) | |
| if len(text) > 10000: # 10k character limit | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=f"Text too long: {len(text)} characters", | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Text is too long for sentiment analysis (max 10,000 characters)." | |
| } | |
| try: | |
| # Prepare API request | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| payload = {"inputs": text} | |
| # Call Hugging Face Inference API | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post(HF_API_URL, json=payload, headers=headers) | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| if response.status_code != 200: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=f"API returned status {response.status_code}", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]), | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": False, | |
| "message": f"Sentiment API error: {response.status_code}", | |
| "response_time_ms": response_time_ms | |
| } | |
| results = response.json() | |
| # Validate results | |
| # API returns: [[{"label": "LABEL_2", "score": 0.95}, ...]] | |
| if not results or not isinstance(results, list) or len(results) == 0: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Empty or invalid model output", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Sentiment analysis returned unexpected format." | |
| } | |
| # Get the first (highest scoring) result | |
| result_list = results[0] if isinstance(results[0], list) else results | |
| if not result_list or len(result_list) == 0: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Empty result list", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Sentiment analysis returned unexpected format." | |
| } | |
| result = result_list[0] | |
| # Validate result structure | |
| if not isinstance(result, dict) or 'label' not in result or 'score' not in result: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid result structure", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]) | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": True, | |
| "message": "Sentiment analysis returned unexpected format." | |
| } | |
| # Map RoBERTa labels to readable format | |
| # LABEL_0 = NEGATIVE, LABEL_1 = NEUTRAL, LABEL_2 = POSITIVE | |
| label_mapping = { | |
| "LABEL_0": "NEGATIVE", | |
| "LABEL_1": "NEUTRAL", | |
| "LABEL_2": "POSITIVE" | |
| } | |
| label = label_mapping.get(result['label'], result['label']) | |
| # Log slow analysis | |
| if response_time_ms > 3000: # 3 seconds | |
| log_interaction( | |
| intent="sentiment_analysis_slow", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| details="Slow sentiment analysis detected", | |
| text_length=len(text) | |
| ) | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| sentiment_label=label, | |
| sentiment_score=result.get('score'), | |
| text_length=len(text) | |
| ) | |
| return { | |
| "label": label, | |
| "score": float(result['score']), | |
| "available": True, | |
| "response_time_ms": response_time_ms | |
| } | |
| except httpx.TimeoutException: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Sentiment analysis request timed out", | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]), | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": False, | |
| "message": "Sentiment analysis request timed out.", | |
| "response_time_ms": response_time_ms | |
| } | |
| except asyncio.CancelledError: | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Analysis cancelled" | |
| ) | |
| raise | |
| except Exception as e: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=str(e), | |
| response_time_ms=response_time_ms, | |
| text_preview=sanitize_for_logging(text[:100]), | |
| fallback_used=True | |
| ) | |
| return { | |
| "label": "ERROR", | |
| "score": 0.0, | |
| "available": False, | |
| "message": "An error occurred during sentiment analysis.", | |
| "error": str(e), | |
| "response_time_ms": response_time_ms | |
| } | |
| async def analyze_sentiment_batch( | |
| texts: List[str], | |
| tenant_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Runs sentiment analysis on a batch of texts for efficiency. | |
| Args: | |
| texts: List of text strings to analyze. | |
| tenant_id: Optional tenant identifier for logging. | |
| Returns: | |
| A dictionary containing: | |
| - results (list): List of sentiment analysis results for each text | |
| - available (bool): Whether the service was available | |
| - total_analyzed (int): Number of texts successfully analyzed | |
| - response_time_ms (int, optional): Total batch analysis time | |
| """ | |
| start_time = time.time() | |
| # Check availability | |
| if not is_sentiment_available(): | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Sentiment API not configured (missing HF_TOKEN)", | |
| batch_size=len(texts) if texts else 0 | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": "Sentiment analysis is temporarily unavailable." | |
| } | |
| # Validate input | |
| if not texts or not isinstance(texts, list): | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Invalid texts input" | |
| ) | |
| return { | |
| "results": [], | |
| "available": True, | |
| "total_analyzed": 0, | |
| "message": "Invalid batch input provided." | |
| } | |
| # Filter valid texts and limit batch size | |
| valid_texts = [t for t in texts if isinstance(t, str) and t.strip()] | |
| if len(valid_texts) > 100: # Batch size limit | |
| valid_texts = valid_texts[:100] | |
| if not valid_texts: | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="No valid texts in batch" | |
| ) | |
| return { | |
| "results": [], | |
| "available": True, | |
| "total_analyzed": 0, | |
| "message": "No valid texts provided for analysis." | |
| } | |
| try: | |
| # Prepare API request with batch input | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| payload = {"inputs": valid_texts} | |
| # Call Hugging Face Inference API | |
| async with httpx.AsyncClient(timeout=60.0) as client: # Longer timeout for batch | |
| response = await client.post(HF_API_URL, json=payload, headers=headers) | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| if response.status_code != 200: | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=f"API returned status {response.status_code}", | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts) | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": f"Sentiment API error: {response.status_code}", | |
| "response_time_ms": response_time_ms | |
| } | |
| results = response.json() | |
| # Process results and map labels | |
| label_mapping = { | |
| "LABEL_0": "NEGATIVE", | |
| "LABEL_1": "NEUTRAL", | |
| "LABEL_2": "POSITIVE" | |
| } | |
| processed_results = [] | |
| if results and isinstance(results, list): | |
| for item in results: | |
| if isinstance(item, list) and len(item) > 0: | |
| top_result = item[0] | |
| if isinstance(top_result, dict) and 'label' in top_result: | |
| processed_results.append({ | |
| "label": label_mapping.get(top_result['label'], top_result['label']), | |
| "score": float(top_result.get('score', 0.0)) | |
| }) | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=True, | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts), | |
| total_analyzed=len(processed_results) | |
| ) | |
| return { | |
| "results": processed_results, | |
| "available": True, | |
| "total_analyzed": len(processed_results), | |
| "response_time_ms": response_time_ms | |
| } | |
| except httpx.TimeoutException: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error="Batch sentiment analysis timed out", | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts) | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": "Batch sentiment analysis timed out.", | |
| "error": "Request timeout", | |
| "response_time_ms": response_time_ms | |
| } | |
| except Exception as e: | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| log_interaction( | |
| intent="sentiment_batch_analysis", | |
| tenant_id=tenant_id, | |
| success=False, | |
| error=str(e), | |
| response_time_ms=response_time_ms, | |
| batch_size=len(valid_texts) | |
| ) | |
| return { | |
| "results": [], | |
| "available": False, | |
| "total_analyzed": 0, | |
| "message": "An error occurred during batch sentiment analysis.", | |
| "error": str(e), | |
| "response_time_ms": response_time_ms | |
| } |