Spaces:
Sleeping
Sleeping
| """Data analysis service""" | |
| import logging | |
| from typing import Dict, List, Any | |
| import pandas as pd | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class Analyzer: | |
| """Service for analyzing data""" | |
| def __init__(self): | |
| self.supported_types = [ | |
| "statistical_summary", | |
| "trend_detection", | |
| "outlier_detection", | |
| "correlation_analysis" | |
| ] | |
| async def analyze( | |
| self, | |
| data: List[Dict], | |
| analysis_type: str, | |
| columns: List[str] = None | |
| ) -> Dict[str, Any]: | |
| """Perform data analysis""" | |
| logger.info(f"π Starting analysis: {analysis_type}") | |
| # Validate analysis type | |
| if analysis_type not in self.supported_types: | |
| raise ValueError( | |
| f"Unknown analysis type: {analysis_type}. " | |
| f"Supported types: {', '.join(self.supported_types)}" | |
| ) | |
| # Convert to DataFrame | |
| df = pd.DataFrame(data) | |
| # Select columns if specified | |
| if columns: | |
| numeric_columns = [col for col in columns if col in df.columns] | |
| else: | |
| numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist() | |
| logger.info(f"Analyzing columns: {numeric_columns}") | |
| # Route to appropriate analysis | |
| if analysis_type == "statistical_summary": | |
| return await self._statistical_summary(df, numeric_columns) | |
| elif analysis_type == "trend_detection": | |
| return await self._trend_detection(df, numeric_columns) | |
| elif analysis_type == "outlier_detection": | |
| return await self._outlier_detection(df, numeric_columns) | |
| elif analysis_type == "correlation_analysis": | |
| return await self._correlation_analysis(df, numeric_columns) | |
| async def _statistical_summary(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]: | |
| """Generate statistical summary""" | |
| try: | |
| results = {} | |
| for col in columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| results[col] = { | |
| "mean": float(df[col].mean()), | |
| "median": float(df[col].median()), | |
| "std": float(df[col].std()), | |
| "min": float(df[col].min()), | |
| "max": float(df[col].max()), | |
| "count": int(df[col].count()) | |
| } | |
| logger.info(f"β Statistical summary complete for {len(results)} columns") | |
| return { | |
| "type": "statistical_summary", | |
| "results": results, | |
| "rows_analyzed": len(df) | |
| } | |
| except Exception as e: | |
| logger.error(f"β Statistical summary failed: {e}") | |
| raise | |
| async def _trend_detection(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]: | |
| """Detect trends in data""" | |
| try: | |
| trends = {} | |
| for col in columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| values = df[col].dropna().values | |
| if len(values) > 1: | |
| # Simple trend: compare first half vs second half | |
| mid = len(values) // 2 | |
| first_half_mean = np.mean(values[:mid]) | |
| second_half_mean = np.mean(values[mid:]) | |
| if second_half_mean > first_half_mean: | |
| trend = "increasing" | |
| trend_strength = ((second_half_mean - first_half_mean) / first_half_mean * 100) if first_half_mean != 0 else 0 | |
| elif second_half_mean < first_half_mean: | |
| trend = "decreasing" | |
| trend_strength = ((first_half_mean - second_half_mean) / first_half_mean * 100) if first_half_mean != 0 else 0 | |
| else: | |
| trend = "stable" | |
| trend_strength = 0 | |
| trends[col] = { | |
| "trend": trend, | |
| "strength": float(trend_strength), | |
| "first_half_avg": float(first_half_mean), | |
| "second_half_avg": float(second_half_mean) | |
| } | |
| logger.info(f"β Trend detection complete for {len(trends)} columns") | |
| return { | |
| "type": "trend_detection", | |
| "results": trends, | |
| "rows_analyzed": len(df) | |
| } | |
| except Exception as e: | |
| logger.error(f"β Trend detection failed: {e}") | |
| raise | |
| async def _outlier_detection(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]: | |
| """Detect outliers in data""" | |
| try: | |
| outliers = {} | |
| for col in columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| Q1 = df[col].quantile(0.25) | |
| Q3 = df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outlier_mask = (df[col] < lower_bound) | (df[col] > upper_bound) | |
| outlier_count = outlier_mask.sum() | |
| outlier_indices = df[outlier_mask].index.tolist() | |
| outliers[col] = { | |
| "count": int(outlier_count), | |
| "percentage": float(outlier_count / len(df) * 100), | |
| "lower_bound": float(lower_bound), | |
| "upper_bound": float(upper_bound), | |
| "outlier_indices": outlier_indices[:10] # First 10 | |
| } | |
| logger.info(f"β Outlier detection complete for {len(outliers)} columns") | |
| return { | |
| "type": "outlier_detection", | |
| "results": outliers, | |
| "rows_analyzed": len(df) | |
| } | |
| except Exception as e: | |
| logger.error(f"β Outlier detection failed: {e}") | |
| raise | |
| async def _correlation_analysis(self, df: pd.DataFrame, columns: List[str]) -> Dict[str, Any]: | |
| """Analyze correlations between columns""" | |
| try: | |
| # Get numeric data | |
| numeric_df = df[columns].select_dtypes(include=[np.number]) | |
| if len(numeric_df.columns) < 2: | |
| return { | |
| "type": "correlation_analysis", | |
| "results": {}, | |
| "message": "Need at least 2 numeric columns for correlation analysis", | |
| "rows_analyzed": len(df) | |
| } | |
| # Calculate correlation matrix | |
| corr_matrix = numeric_df.corr() | |
| # Find strong correlations | |
| strong_correlations = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| col_i = corr_matrix.columns[i] | |
| col_j = corr_matrix.columns[j] | |
| corr_value = corr_matrix.iloc[i, j] | |
| if abs(corr_value) > 0.5: # Strong correlation threshold | |
| strong_correlations.append({ | |
| "column_1": col_i, | |
| "column_2": col_j, | |
| "correlation": float(corr_value) | |
| }) | |
| logger.info(f"β Correlation analysis complete with {len(strong_correlations)} strong correlations") | |
| return { | |
| "type": "correlation_analysis", | |
| "results": { | |
| "strong_correlations": strong_correlations, | |
| "correlation_matrix": corr_matrix.to_dict() | |
| }, | |
| "rows_analyzed": len(df) | |
| } | |
| except Exception as e: | |
| logger.error(f"β Correlation analysis failed: {e}") | |
| raise | |
| # Global analyzer instance | |
| analyzer = Analyzer() | |