Spaces:
Paused
Paused
| """ | |
| Analytics Service - Advanced analytics, business intelligence, and real-time insights. | |
| """ | |
| import logging | |
| import random | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any | |
| from pydantic import BaseModel | |
| logger = logging.getLogger(__name__) | |
| # --- Enums and Dataclasses (from advanced_analytics) --- | |
| class AnalyticsTimeframe(Enum): | |
| HOUR = "1H" | |
| DAY = "1D" | |
| WEEK = "1W" | |
| MONTH = "1M" | |
| QUARTER = "3M" | |
| YEAR = "1Y" | |
| class MetricType(Enum): | |
| FRAUD_AMOUNT = "fraud_amount" | |
| CASE_COUNT = "case_count" | |
| DETECTION_RATE = "detection_rate" | |
| FALSE_POSITIVE_RATE = "false_positive_rate" | |
| RESPONSE_TIME = "response_time" | |
| RECOVERY_RATE = "recovery_rate" | |
| class AnalyticsInsight: | |
| title: str | |
| description: str | |
| impact_level: str # "high", "medium", "low" | |
| confidence_score: float | |
| recommended_actions: list[str] | |
| supporting_data: dict[str, Any] | |
| generated_at: datetime | |
| class PredictiveTrend: | |
| metric: str | |
| current_value: float | |
| predicted_value: float | |
| trend_direction: str # "increasing", "decreasing", "stable" | |
| confidence_interval: tuple[float, float] | |
| time_horizon: str | |
| drivers: list[str] | |
| # --- Pydantic Models (from dashboard_analytics) --- | |
| class InvestigationMetrics(BaseModel): | |
| total_cases: int = 0 | |
| active_cases: int = 0 | |
| completed_cases: int = 0 | |
| average_resolution_time: float = 0.0 | |
| success_rate: float = 0.0 | |
| fraud_detection_rate: float = 0.0 | |
| false_positive_rate: float = 0.0 | |
| ai_assist_rate: float = 0.0 | |
| user_satisfaction_score: float = 0.0 | |
| compliance_rate: float = 0.0 | |
| class PerformanceTrend(BaseModel): | |
| date: datetime | |
| total_cases: int | |
| resolution_time_avg: float | |
| success_rate: float | |
| ai_effectiveness: float | |
| fraud_prevention_rate: float | |
| false_positive_rate: float = 0.0 | |
| class InvestigationInsight(BaseModel): | |
| id: str | |
| insight_type: str | |
| title: str | |
| description: str | |
| confidence_score: float | |
| impact_level: str | |
| recommendations: list[str] | |
| created_at: datetime | |
| # --- Unified Analytics Service --- | |
| class AnalyticsService: | |
| """Unified service for analytics, insights and dashboarding""" | |
| def __init__(self): | |
| self.metrics_history: list[PerformanceTrend] = [] | |
| self._setup_initial_data() | |
| def _setup_initial_data(self): | |
| """Mock initial data for demonstration if needed""" | |
| base_date = datetime.now() - timedelta(days=30) | |
| for i in range(30): | |
| date = base_date + timedelta(days=i) | |
| self.metrics_history.append( | |
| PerformanceTrend( | |
| date=date, | |
| total_cases=random.randint(15, 25), | |
| resolution_time_avg=72.0 - (i * 0.5), | |
| success_rate=75.0 + (i * 0.8), | |
| ai_effectiveness=i * 1.2, | |
| fraud_prevention_rate=85.0 + (i * 0.3), | |
| false_positive_rate=15.0 - (i * 0.2), | |
| ) | |
| ) | |
| async def get_dashboard_data(self, time_range_days: int = 30) -> dict[str, Any]: | |
| """Get comprehensive dashboard metrics and trends""" | |
| cutoff_date = datetime.now() - timedelta(days=time_range_days) | |
| trends = [t for t in self.metrics_history if t.date >= cutoff_date] | |
| latest = ( | |
| trends[-1] | |
| if trends | |
| else PerformanceTrend( | |
| date=datetime.now(), | |
| total_cases=0, | |
| resolution_time_avg=0, | |
| success_rate=0, | |
| ai_effectiveness=0, | |
| fraud_prevention_rate=0, | |
| ) | |
| ) | |
| return { | |
| "current_metrics": { | |
| "total_cases": latest.total_cases, | |
| "active_cases": random.randint(5, 10), | |
| "resolution_time": latest.resolution_time_avg, | |
| "success_rate": latest.success_rate, | |
| "fraud_detection_rate": latest.fraud_prevention_rate, | |
| "ai_assist_rate": latest.ai_effectiveness, | |
| }, | |
| "trends": [t.dict() for t in trends], | |
| "generated_at": datetime.now().isoformat(), | |
| } | |
| async def generate_risk_heatmaps( | |
| self, timeframe: AnalyticsTimeframe = AnalyticsTimeframe.MONTH | |
| ) -> dict[str, Any]: | |
| """Generate risk heatmaps for geographic and temporal analysis""" | |
| return { | |
| "geographic": { | |
| "regions": [ | |
| { | |
| "name": "North America", | |
| "risk_score": 2.1, | |
| "cases": 45, | |
| "amount": 850000, | |
| }, | |
| { | |
| "name": "Europe", | |
| "risk_score": 1.8, | |
| "cases": 38, | |
| "amount": 720000, | |
| }, | |
| { | |
| "name": "Asia Pacific", | |
| "risk_score": 2.4, | |
| "cases": 52, | |
| "amount": 980000, | |
| }, | |
| ] | |
| }, | |
| "temporal": { | |
| "hourly_patterns": [random.uniform(1, 5) for _ in range(24)], | |
| "weekly_patterns": [random.uniform(2, 4) for _ in range(7)], | |
| }, | |
| } | |
| def get_case_analytics( | |
| self, db, date_from: datetime | None = None, date_to: datetime | None = None | |
| ) -> dict[str, Any]: | |
| """Get case analytics with date filtering""" | |
| from sqlalchemy import case, func | |
| from core.database import Case | |
| query = db.query( | |
| func.count().label("total"), | |
| func.sum(case((Case.status == "open", 1), else_=0)).label("open"), | |
| func.sum(case((Case.status == "closed", 1), else_=0)).label("closed"), | |
| func.sum(case((Case.priority == "critical", 1), else_=0)).label("critical"), | |
| ) | |
| if date_from: | |
| query = query.filter(Case.created_at >= date_from) | |
| if date_to: | |
| query = query.filter(Case.created_at <= date_to) | |
| result = query.one() | |
| return { | |
| "total_cases": result.total or 0, | |
| "open_cases": result.open or 0, | |
| "closed_cases": result.closed or 0, | |
| "critical_cases": result.critical or 0, | |
| "date_from": date_from.isoformat() if date_from else None, | |
| "date_to": date_to.isoformat() if date_to else None, | |
| } | |
| def get_transaction_aggregates( | |
| self, | |
| db, | |
| case_id: str | None = None, | |
| date_from: datetime | None = None, | |
| date_to: datetime | None = None, | |
| ) -> dict[str, Any]: | |
| """Get transaction aggregates with filtering""" | |
| from sqlalchemy import func | |
| from core.database import Transaction | |
| query = db.query( | |
| func.count(Transaction.id).label("count"), | |
| func.sum(Transaction.amount).label("total_amount"), | |
| func.avg(Transaction.amount).label("avg_amount"), | |
| func.max(Transaction.amount).label("max_amount"), | |
| ) | |
| if case_id: | |
| query = query.filter(Transaction.case_id == case_id) | |
| if date_from: | |
| query = query.filter(Transaction.date >= date_from) | |
| if date_to: | |
| query = query.filter(Transaction.date <= date_to) | |
| result = query.one() | |
| return { | |
| "transaction_count": result.count or 0, | |
| "total_amount": float(result.total_amount or 0), | |
| "average_amount": float(result.avg_amount or 0), | |
| "max_amount": float(result.max_amount or 0), | |
| "case_id": case_id, | |
| "date_from": date_from.isoformat() if date_from else None, | |
| "date_to": date_to.isoformat() if date_to else None, | |
| } | |
| # Singleton | |
| analytics_service = AnalyticsService() | |
| # For backward compatibility with advanced_analytics.py imports | |
| advanced_analytics = analytics_service | |