| """API endpoints for advanced analytics.""" |
|
|
| import logging |
| from typing import List, Dict, Optional |
| from fastapi import APIRouter, HTTPException |
| from pydantic import BaseModel, Field, validator |
|
|
| from analysis.predictive_intervals import ( |
| calculate_predictive_interval, |
| rank_by_predictive_interval, |
| get_top_positive_by_interval, |
| get_top_negative_by_interval |
| ) |
| from analysis.category_analytics import CategoryAnalytics |
| from analysis.thread_analysis import ThreadAnalyzer |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| router = APIRouter(prefix="/analytics", tags=["analytics"]) |
|
|
| |
| category_analytics: Optional[CategoryAnalytics] = None |
| thread_analyzer: Optional[ThreadAnalyzer] = None |
|
|
|
|
| def get_category_analytics() -> CategoryAnalytics: |
| """Get or create category analytics instance.""" |
| global category_analytics |
| if category_analytics is None: |
| category_analytics = CategoryAnalytics() |
| return category_analytics |
|
|
|
|
| def get_thread_analyzer() -> ThreadAnalyzer: |
| """Get or create thread analyzer instance.""" |
| global thread_analyzer |
| if thread_analyzer is None: |
| thread_analyzer = ThreadAnalyzer() |
| return thread_analyzer |
|
|
|
|
| |
| class SentimentCounts(BaseModel): |
| """Sentiment counts for an item.""" |
| |
| id: str = Field(..., description="Item identifier") |
| positive_count: int = Field(..., description="Number of positive comments", ge=0) |
| negative_count: int = Field(..., description="Number of negative comments", ge=0) |
| neutral_count: int = Field(0, description="Number of neutral comments", ge=0) |
|
|
|
|
| class PredictiveIntervalRequest(BaseModel): |
| """Request model for predictive interval calculation.""" |
| |
| data: List[SentimentCounts] = Field(..., description="List of items with sentiment counts", min_items=1) |
| confidence_level: float = Field(0.95, description="Confidence level", ge=0.90, le=0.99) |
| |
| class Config: |
| json_schema_extra = { |
| "example": { |
| "data": [ |
| {"id": "news_1", "positive_count": 80, "negative_count": 20, "neutral_count": 0}, |
| {"id": "news_2", "positive_count": 1, "negative_count": 0, "neutral_count": 0} |
| ], |
| "confidence_level": 0.95 |
| } |
| } |
|
|
|
|
| class PredictiveIntervalResponse(BaseModel): |
| """Response model for predictive interval calculation.""" |
| |
| ranked_data: List[Dict] = Field(..., description="Items ranked by predictive interval") |
| top_positive: List[Dict] = Field(..., description="Top positive items") |
| top_negative: List[Dict] = Field(..., description="Top negative items") |
|
|
|
|
| class CategorySentimentRequest(BaseModel): |
| """Request model for category sentiment analysis.""" |
| |
| data: List[Dict[str, str]] = Field(..., description="List of items with category and text", min_items=1) |
| |
| class Config: |
| json_schema_extra = { |
| "example": { |
| "data": [ |
| {"category": "politics", "text": "Отличная новость!"}, |
| {"category": "politics", "text": "Ужасная ситуация..."}, |
| {"category": "economy", "text": "Нормально"} |
| ] |
| } |
| } |
|
|
|
|
| class CategorySentimentResponse(BaseModel): |
| """Response model for category sentiment analysis.""" |
| |
| category_stats: Dict[str, Dict] = Field(..., description="Statistics per category") |
| top_positive_categories: List[Dict] = Field(..., description="Top positive categories") |
| top_negative_categories: List[Dict] = Field(..., description="Top negative categories") |
|
|
|
|
| class ThreadAnalysisRequest(BaseModel): |
| """Request model for thread analysis.""" |
| |
| data: List[Dict[str, str]] = Field(..., description="List of comments with news_id and text", min_items=1) |
| |
| class Config: |
| json_schema_extra = { |
| "example": { |
| "data": [ |
| {"news_id": "1", "text": "Отлично!"}, |
| {"news_id": "1", "text": "Ужасно!"}, |
| {"news_id": "2", "text": "Нормально"} |
| ] |
| } |
| } |
|
|
|
|
| class ThreadAnalysisResponse(BaseModel): |
| """Response model for thread analysis.""" |
| |
| thread_stats: List[Dict] = Field(..., description="Thread statistics per news item") |
| correlation: Dict = Field(..., description="Correlation analysis results") |
|
|
|
|
| |
| @router.post("/predictive-intervals", response_model=PredictiveIntervalResponse) |
| async def calculate_predictive_intervals(request: PredictiveIntervalRequest): |
| """ |
| Calculate predictive intervals for ranking items by positive sentiment. |
| |
| Uses Beta distribution to account for uncertainty when sample sizes are small. |
| Useful for ranking news articles or categories by positive sentiment. |
| |
| Args: |
| request: Request with sentiment counts and confidence level |
| |
| Returns: |
| Ranked items with predictive intervals |
| """ |
| try: |
| |
| data = [ |
| { |
| "id": item.id, |
| "positive_count": item.positive_count, |
| "negative_count": item.negative_count, |
| "neutral_count": item.neutral_count |
| } |
| for item in request.data |
| ] |
| |
| |
| ranked_data = rank_by_predictive_interval( |
| data, |
| confidence_level=request.confidence_level |
| ) |
| |
| |
| top_positive = get_top_positive_by_interval( |
| data, |
| top_k=10, |
| confidence_level=request.confidence_level |
| ) |
| top_negative = get_top_negative_by_interval( |
| data, |
| top_k=10, |
| confidence_level=request.confidence_level |
| ) |
| |
| return PredictiveIntervalResponse( |
| ranked_data=ranked_data, |
| top_positive=top_positive, |
| top_negative=top_negative |
| ) |
| except Exception as e: |
| logger.error(f"Error calculating predictive intervals: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Predictive interval calculation failed: {str(e)}" |
| ) |
|
|
|
|
| @router.post("/category-sentiment", response_model=CategorySentimentResponse) |
| async def analyze_category_sentiment(request: CategorySentimentRequest): |
| """ |
| Analyze sentiment distribution across categories. |
| |
| Calculates sentiment statistics for each category and ranks them |
| using predictive intervals. |
| |
| Args: |
| request: Request with category and text data |
| |
| Returns: |
| Category sentiment statistics and rankings |
| """ |
| try: |
| analytics = get_category_analytics() |
| |
| |
| category_stats = analytics.analyze_category_sentiment( |
| request.data, |
| category_key="category", |
| text_key="text" |
| ) |
| |
| |
| top_positive = analytics.get_top_positive_categories( |
| category_stats, |
| top_k=10 |
| ) |
| top_negative = analytics.get_top_negative_categories( |
| category_stats, |
| top_k=10 |
| ) |
| |
| return CategorySentimentResponse( |
| category_stats=category_stats, |
| top_positive_categories=top_positive, |
| top_negative_categories=top_negative |
| ) |
| except Exception as e: |
| logger.error(f"Error analyzing category sentiment: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Category sentiment analysis failed: {str(e)}" |
| ) |
|
|
|
|
| @router.post("/thread-analysis", response_model=ThreadAnalysisResponse) |
| async def analyze_thread_correlation(request: ThreadAnalysisRequest): |
| """ |
| Analyze correlation between thread length and sentiment temperature. |
| |
| Thread length is the number of comments under a news article. |
| Temperature is the probability that a comment is negative. |
| |
| Args: |
| request: Request with news_id and text data |
| |
| Returns: |
| Thread statistics and correlation analysis |
| """ |
| try: |
| analyzer = get_thread_analyzer() |
| |
| |
| thread_lengths = analyzer.calculate_thread_lengths( |
| request.data, |
| news_id_key="news_id" |
| ) |
| temperatures = analyzer.calculate_temperature( |
| request.data, |
| news_id_key="news_id", |
| text_key="text" |
| ) |
| |
| |
| correlation_results = analyzer.analyze_correlation( |
| thread_lengths, |
| temperatures |
| ) |
| |
| |
| def convert_to_python(obj): |
| """Recursively convert numpy types to Python native types.""" |
| import numpy as np |
| if isinstance(obj, np.integer): |
| return int(obj) |
| elif isinstance(obj, np.floating): |
| return float(obj) |
| elif isinstance(obj, np.bool_): |
| return bool(obj) |
| elif isinstance(obj, dict): |
| return {k: convert_to_python(v) for k, v in obj.items()} |
| elif isinstance(obj, (list, tuple)): |
| return [convert_to_python(item) for item in obj] |
| return obj |
| |
| correlation_results = convert_to_python(correlation_results) |
| |
| |
| common_ids = set(thread_lengths.keys()) & set(temperatures.keys()) |
| thread_stats = [ |
| { |
| "news_id": news_id, |
| "thread_length": int(thread_lengths[news_id]), |
| "temperature": float(temperatures[news_id]) |
| } |
| for news_id in common_ids |
| ] |
| |
| return ThreadAnalysisResponse( |
| thread_stats=thread_stats, |
| correlation=correlation_results |
| ) |
| except Exception as e: |
| logger.error(f"Error analyzing thread correlation: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Thread analysis failed: {str(e)}" |
| ) |
|
|
|
|
| @router.get("/health") |
| async def analytics_health(): |
| """ |
| Health check for analytics service. |
| |
| Returns: |
| Status of analytics components |
| """ |
| try: |
| return { |
| "status": "healthy", |
| "category_analytics_loaded": category_analytics is not None, |
| "thread_analyzer_loaded": thread_analyzer is not None |
| } |
| except Exception as e: |
| logger.error(f"Error checking analytics health: {e}") |
| return { |
| "status": "unhealthy", |
| "error": str(e) |
| } |
|
|
|
|