| | import pandas as pd |
| | import numpy as np |
| | from prophet import Prophet |
| | from datetime import datetime |
| | import redis |
| | import json |
| | from sklearn.cluster import KMeans, DBSCAN |
| | from sklearn.preprocessing import StandardScaler, MinMaxScaler |
| | from sklearn.decomposition import PCA |
| | from sklearn.ensemble import IsolationForest |
| | from .json_utils import CustomJSONEncoder |
| | from scipy import stats |
| | from scipy.stats import pearsonr |
| | from statsmodels.tsa.seasonal import seasonal_decompose |
| | from statsmodels.tsa.stattools import adfuller |
| | import networkx as nx |
| | from sklearn.metrics import silhouette_score |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from .supermarket_metrics import supermarket_insights |
| | from app.utils.detect_industry import is_supermarket |
| |
|
| | class AnalyticsService: |
| | def __init__(self): |
| | self.redis_client = redis.Redis(host='localhost', port=6379, db=0) |
| | self.industry_metrics = { |
| | 'retail': self._retail_metrics, |
| | 'wholesale': self._wholesale_metrics, |
| | 'supermarket': self._supermarket_metrics, |
| | 'manufacturing': self._manufacturing_metrics, |
| | 'healthcare': self._healthcare_metrics |
| | } |
| | self.cross_industry_analyzers = { |
| | 'market_dynamics': self._analyze_market_dynamics, |
| | 'supply_chain': self._analyze_supply_chain, |
| | 'customer_insights': self._analyze_customer_insights, |
| | 'operational_efficiency': self._analyze_operational_efficiency, |
| | 'risk_assessment': self._analyze_risk_patterns, |
| | 'sustainability': self._analyze_sustainability_metrics |
| | } |
| | |
| | def perform_eda(self, data, industry=None): |
| | """ |
| | Perform enhanced Exploratory Data Analysis with cross-industry insights |
| | """ |
| | if not data: |
| | raise ValueError("Empty dataset provided") |
| | |
| | df = pd.DataFrame(data) |
| |
|
| | if df.empty: |
| | raise ValueError("Empty dataset provided") |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) == 0: |
| | raise ValueError("Non-numeric values found in dataset") |
| | |
| | |
| | date_columns = [] |
| | for col in df.columns: |
| | if df[col].dtype == 'object': |
| | try: |
| | df[col] = pd.to_datetime(df[col]) |
| | date_columns.append(col) |
| | except (ValueError, TypeError): |
| | continue |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | |
| | |
| | analysis_results = { |
| | 'basic_stats': df[numeric_cols].describe().to_dict() if len(numeric_cols) > 0 else {}, |
| | 'missing_values': df.isnull().sum().to_dict(), |
| | 'columns': list(df.columns), |
| | 'row_count': len(df), |
| | 'correlation_matrix': df[numeric_cols].corr().to_dict() if len(numeric_cols) > 0 else {}, |
| | 'skewness': df[numeric_cols].skew().to_dict() if len(numeric_cols) > 0 else {}, |
| | 'kurtosis': df[numeric_cols].kurtosis().to_dict() if len(numeric_cols) > 0 else {}, |
| | 'outliers': self._detect_outliers(df), |
| | 'distribution_tests': self._perform_distribution_tests(df), |
| | 'dimensionality_reduction': self._perform_dimensionality_reduction(df), |
| | 'temporal_patterns': self._analyze_temporal_patterns(df), |
| | 'anomaly_detection': self._detect_anomalies(df), |
| | 'feature_importance': self._calculate_feature_importance(df) |
| | } |
| | |
| | if is_supermarket(df): |
| | industry = 'supermarket' |
| | results['supermarket_kpis'] = supermarket_insights(df) |
| | |
| | if industry and industry.lower() in self.industry_metrics: |
| | analysis_results['industry_metrics'] = self.industry_metrics[industry.lower()](df) |
| | |
| | |
| | analysis_results['cross_industry_insights'] = {} |
| | for analyzer_name, analyzer_func in self.cross_industry_analyzers.items(): |
| | analysis_results['cross_industry_insights'][analyzer_name] = analyzer_func(df) |
| | |
| | return analysis_results |
| |
|
| | def _detect_outliers(self, df): |
| | """ |
| | Detect outliers using IQR method for numerical columns |
| | """ |
| | outliers = {} |
| | for column in df.select_dtypes(include=[np.number]).columns: |
| | Q1 = df[column].quantile(0.25) |
| | Q3 = df[column].quantile(0.75) |
| | IQR = Q3 - Q1 |
| | outliers[column] = { |
| | 'count': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]), |
| | 'percentage': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]) / len(df) * 100 |
| | } |
| | return outliers |
| | |
| | def _perform_distribution_tests(self, df): |
| | """ |
| | Perform distribution tests for numerical columns |
| | """ |
| | tests = {} |
| | for column in df.select_dtypes(include=[np.number]).columns: |
| | shapiro_test = stats.shapiro(df[column].dropna()) |
| | tests[column] = { |
| | 'shapiro_test': { |
| | 'statistic': float(shapiro_test.statistic), |
| | 'p_value': float(shapiro_test.pvalue) |
| | } |
| | } |
| | return tests |
| |
|
| | def _perform_dimensionality_reduction(self, df): |
| | """ |
| | Perform PCA for dimensional insights |
| | """ |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) < 2: |
| | return {} |
| | |
| | scaler = StandardScaler() |
| | scaled_data = scaler.fit_transform(df[numeric_cols]) |
| | pca = PCA() |
| | pca_result = pca.fit_transform(scaled_data) |
| | |
| | return { |
| | 'explained_variance_ratio': pca.explained_variance_ratio_.tolist(), |
| | 'cumulative_variance_ratio': np.cumsum(pca.explained_variance_ratio_).tolist(), |
| | 'n_components_95_variance': np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1 |
| | } |
| |
|
| | def _analyze_temporal_patterns(self, df): |
| | """ |
| | Analyze temporal patterns and seasonality |
| | """ |
| | date_cols = df.select_dtypes(include=['datetime64']).columns |
| | if len(date_cols) == 0: |
| | return None |
| | |
| | patterns = {} |
| | for date_col in date_cols: |
| | df['year'] = df[date_col].dt.year |
| | df['month'] = df[date_col].dt.month |
| | df['day_of_week'] = df[date_col].dt.dayofweek |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | for metric in numeric_cols: |
| | if metric not in ['year', 'month', 'day_of_week']: |
| | patterns[f"{metric}_by_month"] = df.groupby('month')[metric].mean().to_dict() |
| | patterns[f"{metric}_by_day_of_week"] = df.groupby('day_of_week')[metric].mean().to_dict() |
| | |
| | return patterns |
| |
|
| | def _detect_anomalies(self, df): |
| | """ |
| | Detect anomalies using multiple methods |
| | """ |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) == 0: |
| | return None |
| | |
| | scaler = StandardScaler() |
| | scaled_data = scaler.fit_transform(df[numeric_cols]) |
| | |
| | isolation_forest = IsolationForest(random_state=42, contamination=0.1) |
| | anomalies = isolation_forest.fit_predict(scaled_data) |
| | |
| | return { |
| | 'anomaly_percentage': float((anomalies == -1).mean() * 100), |
| | 'anomaly_indices': np.where(anomalies == -1)[0].tolist() |
| | } |
| |
|
| | def _calculate_feature_importance(self, df): |
| | """ |
| | Calculate feature importance and relationships |
| | """ |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) < 2: |
| | return None |
| | |
| | importance = {} |
| | for col in numeric_cols: |
| | correlations = [] |
| | for other_col in numeric_cols: |
| | if col != other_col: |
| | |
| | if df[col].nunique() <= 1 or df[other_col].nunique() <= 1: |
| | continue |
| | try: |
| | corr, _ = pearsonr(df[col].fillna(0), df[other_col].fillna(0)) |
| | if not np.isnan(corr): |
| | correlations.append((other_col, abs(corr))) |
| | except ValueError: |
| | continue |
| | |
| | |
| | correlation_values = [abs(c[1]) for c in correlations] |
| | importance[col] = { |
| | 'top_correlations': sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3], |
| | 'correlation_strength': float(np.mean(correlation_values)) if correlation_values else 0.0 |
| | } |
| | |
| | return importance |
| |
|
| | def _retail_metrics(self, df): |
| |
|
| | """Calculate retail-specific metrics""" |
| | if not all(col in df.columns for col in ['sales', 'inventory', 'customer_satisfaction']): |
| | |
| | return { |
| | 'sales_performance': {}, |
| | 'customer_behavior': {}, |
| | 'inventory': {} |
| | } |
| |
|
| | metrics = { |
| | 'sales_performance': { |
| | 'total_sales': float(df['sales'].sum()) if 'sales' in df.columns else 0.0, |
| | 'average_daily_sales': float(df['sales'].mean()) if 'sales' in df.columns else 0.0, |
| | 'sales_growth': float((df['sales'].iloc[-1] / df['sales'].iloc[0] - 1) * 100) if 'sales' in df.columns else 0.0 |
| | }, |
| | 'inventory_turnover': { |
| | 'rate': float(df['sales'].sum() / df['inventory'].mean()) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0, |
| | 'days_of_inventory': float(df['inventory'].mean() / (df['sales'].mean() / 30)) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0 |
| | }, |
| | 'customer_metrics': { |
| | 'satisfaction_score': float(df['customer_satisfaction'].mean()) if 'customer_satisfaction' in df.columns else 0.0, |
| | 'satisfaction_trend': df['customer_satisfaction'].rolling(window=7).mean().to_dict() if 'customer_satisfaction' in df.columns else {} |
| | } |
| | } |
| | return metrics |
| |
|
| | def _wholesale_metrics(self, df): |
| | """ |
| | Calculate wholesale-specific metrics |
| | """ |
| | metrics = { |
| | 'order_analytics': {}, |
| | 'supplier_performance': {}, |
| | 'distribution': {} |
| | } |
| | |
| | if 'order_value' in df.columns: |
| | metrics['order_analytics']['average_order_value'] = float(df['order_value'].mean()) |
| | metrics['order_analytics']['order_value_distribution'] = df['order_value'].quantile([0.25, 0.5, 0.75]).to_dict() |
| | |
| | if 'supplier_id' in df.columns and 'delivery_time' in df.columns: |
| | supplier_performance = df.groupby('supplier_id')['delivery_time'].agg(['mean', 'std']).to_dict() |
| | metrics['supplier_performance'] = supplier_performance |
| | |
| | return metrics |
| | |
| | def _supermarket_metrics(self, df): |
| | """ |
| | Calculate supermarket-specific metrics |
| | """ |
| | metrics = { |
| | 'category_performance': {}, |
| | 'basket_analysis': {}, |
| | 'promotion_impact': {} |
| | } |
| | |
| | if 'category' in df.columns and 'sales_amount' in df.columns: |
| | category_sales = df.groupby('category')['sales_amount'].sum() |
| | metrics['category_performance']['top_categories'] = category_sales.nlargest(5).to_dict() |
| | |
| | if 'transaction_id' in df.columns and 'product_id' in df.columns: |
| | |
| | transactions = df.groupby('transaction_id')['product_id'].count() |
| | metrics['basket_analysis']['average_items_per_transaction'] = float(transactions.mean()) |
| | |
| | if 'promotion_flag' in df.columns and 'sales_amount' in df.columns: |
| | promo_impact = df.groupby('promotion_flag')['sales_amount'].mean() |
| | metrics['promotion_impact']['sales_lift'] = float( |
| | (promo_impact.get(1, 0) - promo_impact.get(0, 0)) / promo_impact.get(0, 1) * 100 |
| | ) |
| | |
| | return metrics |
| | |
| | def _manufacturing_metrics(self, df): |
| |
|
| |
|
| | """Calculate manufacturing-specific metrics""" |
| | production_col = 'production_volume' if 'production_volume' in df.columns else 'units_produced' |
| | metrics = { |
| | 'production_efficiency': { |
| | 'volume': float(df[production_col].mean()), |
| | 'trend': df[production_col].rolling(window=7).mean().to_dict() |
| | }, |
| | 'quality_metrics': { |
| | 'defect_rate': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, |
| | 'quality_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} |
| | }, |
| | 'quality_control': { |
| | 'defects_per_unit': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, |
| | 'defect_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} |
| | }, |
| | 'equipment_utilization': { |
| | 'rate': float((df[production_col] / df[production_col].max()).mean() * 100), |
| | 'trend': df[production_col].rolling(window=7).mean().to_dict() |
| | } |
| | } |
| | return metrics |
| | |
| | def _healthcare_metrics(self, df): |
| |
|
| | """Calculate healthcare-specific metrics""" |
| | metrics = { |
| | 'patient_outcomes': { |
| | 'satisfaction': float(df['patient_satisfaction'].mean()), |
| | 'treatment_success': float(df['treatment_success_rate'].mean()) |
| | }, |
| | 'operational_efficiency': { |
| | 'avg_wait_time': float(df['order_fulfillment_time'].mean()), |
| | 'utilization_rate': float(df['production_volume'].mean() / df['production_volume'].max()) |
| | }, |
| | 'quality_of_care': { |
| | 'satisfaction_trend': df['patient_satisfaction'].rolling(window=7).mean().to_dict(), |
| | 'success_rate_trend': df['treatment_success_rate'].rolling(window=7).mean().to_dict() |
| | } |
| | } |
| | return metrics |
| | |
| | def forecast_timeseries(self, data, date_column, value_column): |
| | """ |
| | Forecast time series data with support for edge cases |
| | """ |
| | if not data: |
| | raise ValueError("Empty dataset provided") |
| | |
| | df = pd.DataFrame(data) |
| | if date_column not in df.columns: |
| | raise KeyError(f"Required column '{date_column}' not found") |
| | if value_column not in df.columns: |
| | raise KeyError(f"Required column '{value_column}' not found") |
| | |
| | |
| | try: |
| | df[date_column] = pd.to_datetime(df[date_column]) |
| | except ValueError as exc: |
| | raise ValueError("Invalid date format") from exc |
| | |
| | |
| | has_missing = df[value_column].isnull().any() |
| | if has_missing: |
| | df[value_column] = df[value_column].interpolate(method='linear') |
| | |
| | |
| | Q1 = df[value_column].quantile(0.25) |
| | Q3 = df[value_column].quantile(0.75) |
| | IQR = Q3 - Q1 |
| | outlier_mask = (df[value_column] < (Q1 - 1.5 * IQR)) | (df[value_column] > (Q3 + 1.5 * IQR)) |
| | has_outliers = outlier_mask.any() |
| | |
| | |
| | prophet_df = df.rename(columns={date_column: 'ds', value_column: 'y'}) |
| | model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True) |
| | model.fit(prophet_df) |
| | |
| | |
| | future = model.make_future_dataframe(periods=30) |
| | forecast = model.predict(future) |
| | |
| | result = { |
| | 'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_dict('records'), |
| | 'components': { |
| | 'trend': forecast['trend'].to_dict(), |
| | 'yearly': forecast['yearly'].to_dict() if 'yearly' in forecast else {}, |
| | 'weekly': forecast['weekly'].to_dict() if 'weekly' in forecast else {}, |
| | 'daily': forecast['daily'].to_dict() if 'daily' in forecast else {} |
| | } |
| | } |
| | |
| | if has_missing: |
| | result['handling_missing_values'] = {'filled_indices': df[value_column].isnull().sum()} |
| | |
| | if has_outliers: |
| | result['outlier_impact'] = { |
| | 'outlier_indices': outlier_mask[outlier_mask].index.tolist(), |
| | 'outlier_values': df.loc[outlier_mask, value_column].tolist() |
| | } |
| | |
| | |
| | decomposition = seasonal_decompose(df[value_column], period=7, extrapolate_trend='freq') |
| | result['seasonality_components'] = { |
| | 'trend': decomposition.trend.to_dict(), |
| | 'seasonal': decomposition.seasonal.to_dict(), |
| | 'residual': decomposition.resid.to_dict() |
| | } |
| | |
| |
|
| |
|
| |
|
| | |
| | timestamp = datetime.now().strftime('%Y%m%d%H') |
| | cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" |
| | self.redis_client.set(cache_key, json.dumps(result, cls=CustomJSONEncoder)) |
| | |
| | return result |
| | |
| | def get_cached_forecast(self, date_column, value_column): |
| | """ |
| | Retrieve cached forecast results |
| | """ |
| | timestamp = datetime.now().strftime('%Y%m%d%H') |
| | cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" |
| | cached = self.redis_client.get(cache_key) |
| | |
| | if cached: |
| | return json.loads(cached) |
| | return None |
| |
|
| | def _analyze_market_dynamics(self, df): |
| | """ |
| | Analyze market dynamics across industries |
| | """ |
| | metrics = { |
| | 'market_trends': {}, |
| | 'competitive_analysis': {}, |
| | 'growth_patterns': {} |
| | } |
| | |
| | if 'revenue' in df.columns and 'date' in df.columns: |
| | |
| | df['month'] = pd.to_datetime(df['date']).dt.to_period('M') |
| | monthly_revenue = df.groupby('month')['revenue'].sum() |
| | |
| | |
| | metrics['growth_patterns']['monthly_growth'] = float( |
| | ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) ** (1/len(monthly_revenue)) - 1) * 100 |
| | ) |
| | |
| | |
| | mean_revenue = monthly_revenue.mean() |
| | if mean_revenue > 0: |
| | metrics['market_trends']['volatility'] = float(monthly_revenue.std() / mean_revenue) |
| | else: |
| | metrics['market_trends']['volatility'] = 0.0 |
| | |
| | if 'competitor_price' in df.columns and 'price' in df.columns: |
| |
|
| | comp_price_mean = df['competitor_price'].mean() |
| | if comp_price_mean > 0: |
| | metrics['competitive_analysis']['price_position'] = float( |
| | (df['price'].mean() / comp_price_mean - 1) * 100 |
| | ) |
| | else: |
| | metrics['competitive_analysis']['price_position'] = 0.0 |
| | |
| | return metrics |
| |
|
| | def _analyze_supply_chain(self, df): |
| | """ |
| | Analyze supply chain metrics across industries |
| | """ |
| | metrics = { |
| | 'efficiency': {}, |
| | 'reliability': {}, |
| | 'cost_analysis': {} |
| | } |
| | |
| | |
| | if 'supplier_id' in df.columns and 'delivery_time' in df.columns: |
| | supplier_performance = df.groupby('supplier_id').agg({ |
| | 'delivery_time': ['mean', 'std'], |
| | 'order_value': ['sum', 'mean'] |
| | }).round(2) |
| | |
| | metrics['reliability']['supplier_consistency'] = float( |
| | 1 - (supplier_performance['delivery_time']['std'] / supplier_performance['delivery_time']['mean']).mean() |
| | ) |
| | |
| | |
| | if 'transportation_cost' in df.columns and 'order_value' in df.columns: |
| | metrics['cost_analysis']['logistics_cost_ratio'] = float( |
| | (df['transportation_cost'].sum() / df['order_value'].sum()) * 100 |
| | ) |
| | |
| | return metrics |
| |
|
| | def _analyze_customer_insights(self, df): |
| | """ |
| | Cross-industry customer behavior analysis |
| | """ |
| | insights = { |
| | 'customer_segments': {}, |
| | 'behavior_patterns': {}, |
| | 'lifetime_value': {} |
| | } |
| | |
| | if 'customer_id' in df.columns and 'transaction_amount' in df.columns: |
| | |
| | customer_features = df.groupby('customer_id').agg({ |
| | 'transaction_amount': ['sum', 'mean', 'count'] |
| | }).values |
| | |
| | scaler = MinMaxScaler() |
| | scaled_features = scaler.fit_transform(customer_features) |
| | |
| | |
| | dbscan = DBSCAN(eps=0.3, min_samples=5) |
| | clusters = dbscan.fit_predict(scaled_features) |
| | |
| | insights['customer_segments']['natural_segments'] = { |
| | 'n_segments': len(np.unique(clusters[clusters >= 0])), |
| | 'segment_sizes': pd.Series(clusters).value_counts().to_dict() |
| | } |
| | |
| | return insights |
| |
|
| | def _analyze_operational_efficiency(self, df): |
| | """ |
| | Cross-industry operational efficiency analysis |
| | """ |
| | metrics = { |
| | 'process_efficiency': {}, |
| | 'resource_utilization': {}, |
| | 'bottleneck_analysis': {} |
| | } |
| | |
| | if 'process_time' in df.columns and 'output_quantity' in df.columns: |
| | |
| | metrics['process_efficiency']['throughput_rate'] = float( |
| | df['output_quantity'].sum() / df['process_time'].sum() |
| | ) |
| | |
| | |
| | process_stability = 1 - (df['process_time'].std() / df['process_time'].mean()) |
| | metrics['process_efficiency']['stability_score'] = float(process_stability) |
| | |
| | return metrics |
| |
|
| | def _analyze_risk_patterns(self, df): |
| | """ |
| | Cross-industry risk pattern analysis |
| | """ |
| | risk_metrics = { |
| | 'operational_risk': {}, |
| | 'market_risk': {}, |
| | 'compliance_risk': {} |
| | } |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) > 0: |
| | |
| | iso_forest = IsolationForest(contamination=0.1, random_state=42) |
| | risk_scores = iso_forest.fit_predict(df[numeric_cols]) |
| | |
| | risk_metrics['operational_risk']['anomaly_percentage'] = float( |
| | (risk_scores == -1).mean() * 100 |
| | ) |
| | |
| | return risk_metrics |
| |
|
| | def _analyze_sustainability_metrics(self, df): |
| | """ |
| | |
| | Analyze sustainability metrics including environmental impact, resource utilization, and waste management |
| | """ |
| | if not all(col in df.columns for col in ['energy_consumption', 'water_consumption', 'waste_generated']): |
| | return {} |
| | |
| | results = { |
| | 'environmental_impact': { |
| | 'carbon_footprint_trend': df['carbon_footprint'].rolling(window=7).mean().to_dict() if 'carbon_footprint' in df.columns else {}, |
| | 'total_emissions': float(df['energy_consumption'].sum() * 0.5) |
| | }, |
| | 'resource_utilization': { |
| | 'energy_efficiency': float(df['energy_consumption'].mean()), |
| | 'water_efficiency': float(df['water_consumption'].mean()) |
| | }, |
| | 'waste_management': { |
| | 'recycling_performance': float(df['recycling_rate'].mean()) if 'recycling_rate' in df.columns else 0.0, |
| | 'waste_reduction_trend': df['waste_generated'].rolling(window=7).mean().to_dict() |
| | } |
| | } |
| | return results |
| |
|
| | def prepare_ai_query_interface(self, df): |
| | """ |
| | Prepare data for natural language analytics queries with enhanced semantic understanding |
| | """ |
| | query_interface = { |
| | 'semantic_mappings': {}, |
| | 'entity_relationships': {}, |
| | 'available_metrics': {}, |
| | 'temporal_context': {}, |
| | 'metric_relationships': {}, |
| | 'data_patterns': {}, |
| | 'suggested_queries': [] |
| | } |
| | |
| | try: |
| | |
| | text_columns = df.select_dtypes(include=['object']).columns |
| | vectorizer = TfidfVectorizer(max_features=1000) |
| | |
| | for col in text_columns: |
| | if df[col].str.len().mean() > 5: |
| | text_features = vectorizer.fit_transform(df[col].fillna('').astype(str)) |
| | query_interface['semantic_mappings'][col] = { |
| | 'vocabulary': vectorizer.vocabulary_, |
| | 'idf_values': vectorizer.idf_.tolist(), |
| | 'top_terms': dict(zip( |
| | vectorizer.get_feature_names_out(), |
| | np.asarray(text_features.sum(axis=0)).ravel() |
| | )) |
| | } |
| | |
| | |
| | entity_columns = [col for col in df.columns if any(entity in col.lower() |
| | for entity in ['id', 'category', 'type', 'name', 'class', 'group'])] |
| | |
| | for col in entity_columns: |
| | if df[col].dtype == 'object': |
| | value_counts = df[col].value_counts() |
| | unique_values = df[col].unique().tolist() |
| | |
| | |
| | hierarchy = {} |
| | if '_' in col or col.lower().endswith('_id'): |
| | related_cols = [c for c in df.columns if col.split('_')[0] in c and c != col] |
| | for rel_col in related_cols: |
| | hierarchy[rel_col] = df.groupby(col)[rel_col].agg(list).to_dict() |
| | |
| | query_interface['entity_relationships'][col] = { |
| | 'unique_values': unique_values, |
| | 'value_counts': value_counts.to_dict(), |
| | 'hierarchy': hierarchy, |
| | 'cardinality': len(unique_values) |
| | } |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | for col in numeric_cols: |
| | stats = df[col].describe() |
| | query_interface['available_metrics'][col] = { |
| | 'min': float(stats['min']), |
| | 'max': float(stats['max']), |
| | 'mean': float(stats['mean']), |
| | 'std': float(stats['std']), |
| | 'quartiles': { |
| | '25%': float(stats['25%']), |
| | '50%': float(stats['50%']), |
| | '75%': float(stats['75%']) |
| | } |
| | } |
| | |
| | |
| | correlations = {} |
| | for other_col in numeric_cols: |
| | if col != other_col: |
| | corr = df[col].corr(df[other_col]) |
| | if abs(corr) > 0.3: |
| | correlations[other_col] = float(corr) |
| | |
| | query_interface['metric_relationships'][col] = { |
| | 'correlations': correlations, |
| | 'trends': self._analyze_metric_trends(df, col) |
| | } |
| | |
| | |
| | date_cols = df.select_dtypes(include=['datetime64']).columns |
| | if len(date_cols) == 0: |
| | |
| | for col in df.columns: |
| | if df[col].dtype == 'object': |
| | try: |
| | pd.to_datetime(df[col]) |
| | date_cols = date_cols.append(col) |
| | except: |
| | continue |
| | |
| | for date_col in date_cols: |
| | df[date_col] = pd.to_datetime(df[date_col]) |
| | temporal_stats = { |
| | 'min_date': df[date_col].min().isoformat(), |
| | 'max_date': df[date_col].max().isoformat(), |
| | 'frequency': pd.infer_freq(df[date_col]), |
| | 'temporal_patterns': {} |
| | } |
| | |
| | |
| | temporal_stats['temporal_patterns'] = { |
| | 'daily_pattern': df.groupby(df[date_col].dt.dayofweek).size().to_dict(), |
| | 'monthly_pattern': df.groupby(df[date_col].dt.month).size().to_dict(), |
| | 'yearly_pattern': df.groupby(df[date_col].dt.year).size().to_dict() |
| | } |
| | |
| | query_interface['temporal_context'][date_col] = temporal_stats |
| | |
| | |
| | query_interface['data_patterns'] = { |
| | 'missing_patterns': df.isnull().sum().to_dict(), |
| | 'unique_value_counts': df.nunique().to_dict(), |
| | 'distribution_types': self._analyze_distributions(df) |
| | } |
| | |
| | |
| | query_interface['suggested_queries'] = self._generate_suggested_queries(df) |
| | |
| | |
| | query_interface['metadata'] = { |
| | 'row_count': len(df), |
| | 'column_count': len(df.columns), |
| | 'memory_usage': df.memory_usage(deep=True).sum(), |
| | 'data_types': df.dtypes.astype(str).to_dict() |
| | } |
| | |
| | except Exception as e: |
| | query_interface['error'] = str(e) |
| | |
| | return query_interface |
| | |
| | def _analyze_metric_trends(self, df, column): |
| | """Helper method to analyze trends in numeric columns""" |
| | trends = {} |
| | if 'date' in df.columns: |
| | df['date'] = pd.to_datetime(df['date']) |
| | time_series = df.groupby('date')[column].mean() |
| | if len(time_series) > 2: |
| | |
| | x = np.arange(len(time_series)) |
| | y = time_series.values |
| | slope, intercept = np.polyfit(x, y, 1) |
| | trends['slope'] = float(slope) |
| | trends['trend_direction'] = 'increasing' if slope > 0 else 'decreasing' |
| | trends['trend_strength'] = float(abs(slope) / time_series.mean()) |
| | return trends |
| | |
| | def _analyze_distributions(self, df): |
| | """Helper method to analyze value distributions""" |
| | distributions = {} |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | |
| | for col in numeric_cols: |
| | if df[col].nunique() > 5: |
| | |
| | _, p_value = stats.normaltest(df[col].dropna()) |
| | skewness = float(df[col].skew()) |
| | kurtosis = float(df[col].kurtosis()) |
| | |
| | distributions[col] = { |
| | 'distribution_type': 'normal' if p_value > 0.05 else 'non_normal', |
| | 'skewness': skewness, |
| | 'kurtosis': kurtosis |
| | } |
| | return distributions |
| | |
| | def _generate_suggested_queries(self, df): |
| | """Helper method to generate relevant query suggestions""" |
| | suggestions = [] |
| | |
| | |
| | if 'date' in df.columns: |
| | suggestions.extend([ |
| | "Show the trend over time", |
| | "Compare year-over-year growth", |
| | "Find seasonal patterns" |
| | ]) |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) > 0: |
| | suggestions.extend([ |
| | f"Analyze the distribution of {col}" for col in numeric_cols[:3] |
| | ]) |
| | |
| | |
| | categorical_cols = df.select_dtypes(include=['object']).columns |
| | if len(categorical_cols) > 0: |
| | suggestions.extend([ |
| | f"Break down metrics by {col}" for col in categorical_cols[:3] |
| | ]) |
| | |
| | return suggestions |
| |
|
| | def enhance_cross_industry_correlations(self, df): |
| | """ |
| | Enhanced analysis of correlations across different industries |
| | """ |
| | correlations = { |
| | 'metric_correlations': {}, |
| | 'industry_patterns': {}, |
| | 'shared_trends': {} |
| | } |
| | |
| | if 'industry' in df.columns: |
| | industries = df['industry'].unique() |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | |
| | |
| | for ind1 in industries: |
| | for ind2 in industries: |
| | if ind1 < ind2: |
| | ind1_data = df[df['industry'] == ind1][numeric_cols] |
| | ind2_data = df[df['industry'] == ind2][numeric_cols] |
| | |
| | if not ind1_data.empty and not ind2_data.empty: |
| | common_metrics = set(ind1_data.columns) & set(ind2_data.columns) |
| | for metric in common_metrics: |
| | corr, p_value = pearsonr( |
| | ind1_data[metric].fillna(0), |
| | ind2_data[metric].fillna(0) |
| | ) |
| | correlations['metric_correlations'][f"{ind1}_{ind2}_{metric}"] = { |
| | 'correlation': float(corr), |
| | 'p_value': float(p_value) |
| | } |
| | |
| | |
| | if 'date' in df.columns: |
| | for metric in numeric_cols: |
| | industry_trends = {} |
| | for industry in industries: |
| | industry_data = df[df['industry'] == industry] |
| | if not industry_data.empty: |
| | trend = industry_data.groupby('date')[metric].mean() |
| | if len(trend) > 0: |
| | industry_trends[industry] = trend.to_dict() |
| | |
| | correlations['shared_trends'][metric] = industry_trends |
| | |
| | return correlations |
| |
|
| | def perform_market_basket_analysis(self, df: pd.DataFrame, min_support: float = 0.01, |
| | min_confidence: float = 0.3, min_lift: float = 1.0) -> dict: |
| | """ |
| | Perform advanced market basket analysis with support for multiple analytics dimensions. |
| | |
| | Args: |
| | df (pd.DataFrame): Input transaction data with required columns |
| | min_support (float): Minimum support threshold for frequent itemsets (default: 0.01) |
| | min_confidence (float): Minimum confidence threshold for rules (default: 0.3) |
| | min_lift (float): Minimum lift threshold for rules (default: 1.0) |
| | |
| | Returns: |
| | dict: Dictionary containing: |
| | - product_associations: Support, confidence, and lift metrics for product pairs |
| | - temporal_baskets: Time-based purchase patterns |
| | - product_clusters: Product groupings based on purchase behavior |
| | - customer_segments: Customer segments based on purchase patterns |
| | - performance_metrics: Key performance indicators |
| | |
| | Raises: |
| | ValueError: If required columns are missing or data validation fails |
| | """ |
| | try: |
| | |
| | required_columns = ['transaction_id', 'product_id'] |
| | if not all(col in df.columns for col in required_columns): |
| | raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}") |
| | |
| | if df.empty: |
| | raise ValueError("Empty dataframe provided") |
| | |
| | |
| | df = df.copy() |
| | |
| | |
| | baskets = (df.groupby('transaction_id')['product_id'] |
| | .agg(lambda x: frozenset(x.values)) |
| | .reset_index()) |
| | |
| | total_transactions = len(baskets) |
| | |
| | |
| | product_freq = df.groupby('product_id').size().to_dict() |
| | |
| | |
| | pairs_data = [] |
| | for products in baskets['product_id']: |
| | products_list = list(products) |
| | pairs_data.extend( |
| | tuple(sorted([p1, p2])) |
| | for i, p1 in enumerate(products_list) |
| | for p2 in products_list[i+1:] |
| | ) |
| | |
| | pair_freq = pd.Series(pairs_data).value_counts().to_dict() |
| | |
| | |
| | product_associations = { |
| | 'support': {}, |
| | 'confidence': {}, |
| | 'lift': {}, |
| | 'metrics_distribution': { |
| | 'support': {'min': float('inf'), 'max': 0, 'mean': 0}, |
| | 'confidence': {'min': float('inf'), 'max': 0, 'mean': 0}, |
| | 'lift': {'min': float('inf'), 'max': 0, 'mean': 0} |
| | } |
| | } |
| | |
| | valid_rules = [] |
| | for pair, freq in pair_freq.items(): |
| | prod1, prod2 = pair |
| | support = freq / total_transactions |
| | |
| | if support >= min_support: |
| | confidence_1_2 = freq / product_freq[prod1] |
| | confidence_2_1 = freq / product_freq[prod2] |
| | max_confidence = max(confidence_1_2, confidence_2_1) |
| | |
| | if max_confidence >= min_confidence: |
| | lift = (freq * total_transactions) / (product_freq[prod1] * product_freq[prod2]) |
| | |
| | if lift >= min_lift: |
| | valid_rules.append({ |
| | 'pair': pair, |
| | 'support': support, |
| | 'confidence': max_confidence, |
| | 'lift': lift |
| | }) |
| | |
| | |
| | pair_key = f"({prod1}, {prod2})" |
| | product_associations['support'][pair_key] = float(support) |
| | product_associations['confidence'][pair_key] = float(max_confidence) |
| | product_associations['lift'][pair_key] = float(lift) |
| | |
| | |
| | for metric_type, value in [('support', support), |
| | ('confidence', max_confidence), |
| | ('lift', lift)]: |
| | dist = product_associations['metrics_distribution'][metric_type] |
| | dist['min'] = min(dist['min'], value) |
| | dist['max'] = max(dist['max'], value) |
| | |
| | |
| | for metric_type in ['support', 'confidence', 'lift']: |
| | values = [rule[metric_type] for rule in valid_rules] |
| | if values: |
| | product_associations['metrics_distribution'][metric_type]['mean'] = float(sum(values) / len(values)) |
| | else: |
| | product_associations['metrics_distribution'][metric_type] = {'min': 0, 'max': 0, 'mean': 0} |
| | |
| | |
| | temporal_patterns = self._analyze_temporal_patterns(df) if 'timestamp' in df.columns else {} |
| | |
| | |
| | product_clusters = self._perform_product_clustering(df) if 'quantity' in df.columns else {} |
| | |
| | |
| | customer_segments = self._analyze_customer_segments(df) if 'customer_id' in df.columns else {} |
| | |
| | |
| | performance_metrics = { |
| | 'total_transactions': total_transactions, |
| | 'unique_products': len(product_freq), |
| | 'avg_basket_size': float(df.groupby('transaction_id')['product_id'].count().mean()), |
| | 'total_rules_found': len(valid_rules), |
| | 'rules_distribution': { |
| | 'strong_associations': len([r for r in valid_rules if r['lift'] > 2]), |
| | 'moderate_associations': len([r for r in valid_rules if 1 < r['lift'] <= 2]), |
| | 'weak_associations': len([r for r in valid_rules if r['lift'] <= 1]) |
| | } |
| | } |
| | |
| | return { |
| | 'product_associations': product_associations, |
| | 'temporal_baskets': temporal_patterns, |
| | 'product_clusters': product_clusters, |
| | 'customer_segments': customer_segments, |
| | 'performance_metrics': performance_metrics |
| | } |
| | |
| | except Exception as e: |
| | print(f"Error in market basket analysis: {str(e)}") |
| | raise ValueError(f"Market basket analysis failed: {str(e)}") from e |
| | |
| | def _analyze_temporal_patterns(self, df: pd.DataFrame) -> dict: |
| | """Analyze temporal patterns in purchase behavior""" |
| | patterns = { |
| | 'daily_patterns': {}, |
| | 'weekly_patterns': {}, |
| | 'monthly_patterns': {}, |
| | 'hourly_patterns': {} |
| | } |
| | |
| | try: |
| | timestamps = pd.to_datetime(df['timestamp']) |
| | |
| | for period, grouper in [ |
| | ('hourly_patterns', timestamps.dt.hour), |
| | ('daily_patterns', timestamps.dt.day), |
| | ('weekly_patterns', timestamps.dt.dayofweek), |
| | ('monthly_patterns', timestamps.dt.month) |
| | ]: |
| | pattern_data = df.groupby(grouper).agg({ |
| | 'product_id': ['count', 'nunique'], |
| | 'transaction_id': 'nunique', |
| | 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count'] |
| | }).round(2) |
| | |
| | patterns[period] = { |
| | 'transaction_count': pattern_data['transaction_id']['nunique'].to_dict(), |
| | 'product_count': pattern_data['product_id']['count'].to_dict(), |
| | 'unique_products': pattern_data['product_id']['nunique'].to_dict(), |
| | 'total_quantity': pattern_data['quantity']['sum'].to_dict() if 'quantity' in df.columns else {}, |
| | 'avg_quantity': pattern_data['quantity']['mean'].to_dict() if 'quantity' in df.columns else {} |
| | } |
| | |
| | except (ValueError, KeyError) as e: |
| | print(f"Error in temporal pattern analysis: {str(e)}") |
| | return patterns |
| | |
| | return patterns |
| | |
| | def _perform_product_clustering(self, df: pd.DataFrame) -> dict: |
| | """Perform advanced product clustering analysis""" |
| | try: |
| | |
| | product_features = df.groupby('product_id').agg({ |
| | 'quantity': ['mean', 'std', 'sum', 'count'], |
| | 'transaction_id': 'nunique' |
| | }).fillna(0) |
| | |
| | |
| | product_features['quantity_per_transaction'] = ( |
| | product_features['quantity']['sum'] / |
| | product_features['transaction_id']['nunique'] |
| | ) |
| | |
| | |
| | features_for_clustering = product_features.copy() |
| | features_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col |
| | for col in features_for_clustering.columns] |
| | |
| | if len(features_for_clustering) > 1: |
| | |
| | scaler = StandardScaler() |
| | scaled_features = scaler.fit_transform(features_for_clustering) |
| | |
| | |
| | max_clusters = min(5, len(features_for_clustering) - 1) |
| | scores = [] |
| | for k in range(2, max_clusters + 1): |
| | kmeans = KMeans(n_clusters=k, random_state=42) |
| | clusters = kmeans.fit_predict(scaled_features) |
| | score = silhouette_score(scaled_features, clusters) |
| | scores.append((k, score)) |
| | |
| | |
| | optimal_k = max(scores, key=lambda x: x[1])[0] |
| | kmeans = KMeans(n_clusters=optimal_k, random_state=42) |
| | clusters = kmeans.fit_predict(scaled_features) |
| | |
| | |
| | cluster_data = { |
| | 'cluster_assignments': { |
| | prod: int(cluster) for prod, cluster in zip(product_features.index, clusters) |
| | }, |
| | 'cluster_profiles': {}, |
| | 'evaluation_metrics': { |
| | 'silhouette_score': float(max(scores, key=lambda x: x[1])[1]), |
| | 'num_clusters': optimal_k |
| | } |
| | } |
| | |
| | |
| | for cluster_id in range(optimal_k): |
| | cluster_mask = clusters == cluster_id |
| | cluster_data['cluster_profiles'][str(cluster_id)] = { |
| | 'size': int(sum(cluster_mask)), |
| | 'avg_quantity': float(product_features['quantity']['mean'][cluster_mask].mean()), |
| | 'avg_transactions': float(product_features['transaction_id']['nunique'][cluster_mask].mean()), |
| | 'total_quantity': float(product_features['quantity']['sum'][cluster_mask].sum()), |
| | 'purchase_frequency': float( |
| | (product_features['quantity']['count'][cluster_mask].sum() / |
| | product_features['transaction_id']['nunique'][cluster_mask].sum()) |
| | ) |
| | } |
| | |
| | return cluster_data |
| | |
| | except np.linalg.LinAlgError as e: |
| | print(f"Error in clustering computation: {str(e)}") |
| | return {} |
| | except (ValueError, KeyError) as e: |
| | print(f"Error in product clustering: {str(e)}") |
| | return {} |
| | |
| | return {} |
| | |
| | def _analyze_customer_segments(self, df: pd.DataFrame) -> dict: |
| | """Analyze customer segments based on purchase behavior""" |
| | try: |
| | if 'customer_id' not in df.columns: |
| | return {} |
| | |
| | customer_stats = df.groupby('customer_id').agg({ |
| | 'transaction_id': 'nunique', |
| | 'product_id': ['nunique', 'count'], |
| | 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count', 'mean'] |
| | }) |
| | |
| | |
| | if 'timestamp' in df.columns: |
| | current_date = pd.to_datetime(df['timestamp']).max() |
| | customer_stats['recency'] = df.groupby('customer_id')['timestamp'].max().apply( |
| | lambda x: (current_date - pd.to_datetime(x)).days |
| | ) |
| | |
| | |
| | stats_for_clustering = customer_stats.copy() |
| | stats_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col |
| | for col in stats_for_clustering.columns] |
| | |
| | if len(stats_for_clustering) > 1: |
| | scaler = StandardScaler() |
| | scaled_features = scaler.fit_transform(stats_for_clustering) |
| | |
| | |
| | dbscan = DBSCAN(eps=0.5, min_samples=3) |
| | clusters = dbscan.fit_predict(scaled_features) |
| | |
| | return { |
| | 'customer_segments': { |
| | str(cust): int(cluster) for cust, cluster in zip(customer_stats.index, clusters) |
| | }, |
| | 'segment_profiles': { |
| | str(segment): { |
| | 'size': int(sum(clusters == segment)), |
| | 'avg_transactions': float(customer_stats['transaction_id']['nunique'][clusters == segment].mean()), |
| | 'avg_products': float(customer_stats['product_id']['nunique'][clusters == segment].mean()) |
| | } |
| | for segment in set(clusters) if segment != -1 |
| | }, |
| | 'segment_statistics': { |
| | 'num_segments': len(set(clusters) - {-1}), |
| | 'noise_points': int(sum(clusters == -1)) |
| | } |
| | } |
| | |
| | except Exception as e: |
| | print(f"Error in customer segmentation: {str(e)}") |
| | return {} |
| | |
| | def _calculate_correlations(self, df: pd.DataFrame) -> dict: |
| | """Calculate correlations between numeric columns with detailed statistics""" |
| | correlations = {} |
| | |
| | try: |
| | numeric_cols = df.select_dtypes(include=[np.number]).columns |
| | if len(numeric_cols) < 2: |
| | return correlations |
| | |
| | |
| | corr_matrix = df[numeric_cols].corr() |
| | |
| | |
| | for col1 in numeric_cols: |
| | correlations[col1] = {} |
| | for col2 in numeric_cols: |
| | if col1 != col2: |
| | correlation = corr_matrix.loc[col1, col2] |
| | if not np.isnan(correlation): |
| | |
| | coef, p_value = pearsonr(df[col1].fillna(0), df[col2].fillna(0)) |
| | correlations[col1][col2] = { |
| | 'coefficient': float(correlation), |
| | 'p_value': float(p_value), |
| | 'strength': 'strong' if abs(correlation) > 0.7 |
| | else 'moderate' if abs(correlation) > 0.3 |
| | else 'weak', |
| | 'direction': 'positive' if correlation > 0 else 'negative', |
| | 'sample_size': len(df) |
| | } |
| | |
| | except Exception as e: |
| | print(f"Error calculating correlations: {str(e)}") |
| | return {} |
| | |
| | return correlations |
| |
|