import os import uuid import json import pandas as pd import numpy as np from datetime import datetime, timedelta from flask import Flask, request, jsonify, send_file from flask_cors import CORS from werkzeug.utils import secure_filename import threading import time import logging from scipy import stats from scipy.cluster.hierarchy import dendrogram, linkage, fcluster from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor from sklearn.linear_model import LinearRegression, LogisticRegression, Ridge, Lasso from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering from sklearn.decomposition import PCA from sklearn.metrics import mean_squared_error, r2_score, classification_report, confusion_matrix from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import seaborn as sns import plotly.graph_objects as go import plotly.express as px from plotly.utils import PlotlyJSONEncoder import io import base64 from apscheduler.schedulers.background import BackgroundScheduler import atexit import warnings warnings.filterwarnings('ignore') # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Configuration UPLOAD_FOLDER = '/tmp/uploads' PROCESSED_FOLDER = '/tmp/processed' MODELS_FOLDER = '/tmp/models' MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB for enterprise ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv', 'feather'} FILE_EXPIRY_HOURS = 24 # Extended for enterprise use # Ensure directories exist for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]: os.makedirs(folder, exist_ok=True) # Enhanced file storage with metadata file_storage = {} model_storage = {} analysis_history = {} class EnterpriseAnalytics: """Enterprise-grade analytics engine""" def __init__(self): self.scaler = StandardScaler() self.models = {} def advanced_data_profiling(self, df): """Comprehensive data profiling like enterprise tools""" profile = { 'dataset_overview': { 'rows': len(df), 'columns': len(df.columns), 'memory_usage': df.memory_usage(deep=True).sum(), 'duplicate_rows': df.duplicated().sum() }, 'column_analysis': {}, 'data_quality': {}, 'relationships': {}, 'recommendations': [] } for col in df.columns: col_data = df[col] col_profile = { 'dtype': str(col_data.dtype), 'missing_count': col_data.isnull().sum(), 'missing_percentage': (col_data.isnull().sum() / len(df)) * 100, 'unique_values': col_data.nunique(), 'cardinality': col_data.nunique() / len(df) if len(df) > 0 else 0 } if pd.api.types.is_numeric_dtype(col_data): col_profile.update({ 'statistics': { 'mean': col_data.mean(), 'median': col_data.median(), 'std': col_data.std(), 'min': col_data.min(), 'max': col_data.max(), 'q25': col_data.quantile(0.25), 'q75': col_data.quantile(0.75), 'skewness': stats.skew(col_data.dropna()), 'kurtosis': stats.kurtosis(col_data.dropna()) }, 'distribution': 'normal' if abs(stats.skew(col_data.dropna())) < 0.5 else 'skewed' }) else: col_profile.update({ 'top_categories': col_data.value_counts().head(10).to_dict(), 'category_distribution': 'uniform' if col_data.value_counts().std() < col_data.value_counts().mean() * 0.5 else 'imbalanced' }) profile['column_analysis'][col] = col_profile # Data quality assessment profile['data_quality'] = { 'completeness_score': (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100, 'uniqueness_score': (df.nunique().sum() / (len(df) * len(df.columns))) * 100, 'consistency_score': self._calculate_consistency_score(df) } # Generate recommendations profile['recommendations'] = self._generate_recommendations(df, profile) return profile def _calculate_consistency_score(self, df): """Calculate data consistency score""" score = 100 for col in df.select_dtypes(include=['object']): # Check for inconsistent formatting values = df[col].dropna().astype(str) if len(values) > 0: # Check for mixed case if len(set([v.lower() for v in values])) != len(set(values)): score -= 5 # Check for leading/trailing spaces if any(v != v.strip() for v in values): score -= 5 return max(0, score) def _generate_recommendations(self, df, profile): """Generate actionable recommendations""" recommendations = [] # High missing value columns for col, analysis in profile['column_analysis'].items(): if analysis['missing_percentage'] > 20: recommendations.append({ 'type': 'data_quality', 'priority': 'high', 'message': f"Column '{col}' has {analysis['missing_percentage']:.1f}% missing values. Consider imputation or removal.", 'action': 'handle_missing_values' }) # High cardinality categorical columns for col, analysis in profile['column_analysis'].items(): if analysis.get('cardinality', 0) > 0.8 and df[col].dtype == 'object': recommendations.append({ 'type': 'feature_engineering', 'priority': 'medium', 'message': f"Column '{col}' has high cardinality. Consider feature encoding or dimensionality reduction.", 'action': 'encode_categorical' }) # Skewed distributions for col, analysis in profile['column_analysis'].items(): if 'statistics' in analysis and abs(analysis['statistics']['skewness']) > 2: recommendations.append({ 'type': 'data_transformation', 'priority': 'medium', 'message': f"Column '{col}' is highly skewed. Consider log transformation or scaling.", 'action': 'transform_distribution' }) return recommendations def advanced_feature_engineering(self, df, target_column=None): """Enterprise-level feature engineering""" engineered_features = {} # Numeric feature engineering numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: if col != target_column: # Polynomial features engineered_features[f'{col}_squared'] = df[col] ** 2 engineered_features[f'{col}_log'] = np.log1p(df[col].abs()) # Binning engineered_features[f'{col}_binned'] = pd.cut(df[col], bins=5, labels=False) # Rolling statistics (if data has time component) if len(df) > 10: engineered_features[f'{col}_rolling_mean'] = df[col].rolling(window=min(5, len(df)//2)).mean() # Categorical feature engineering categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: if col != target_column: # Frequency encoding freq_map = df[col].value_counts().to_dict() engineered_features[f'{col}_frequency'] = df[col].map(freq_map) # Target encoding (if target is provided) if target_column and target_column in df.columns: target_mean = df.groupby(col)[target_column].mean() engineered_features[f'{col}_target_encoded'] = df[col].map(target_mean) # Interaction features if len(numeric_cols) >= 2: col_pairs = [(numeric_cols[i], numeric_cols[j]) for i in range(len(numeric_cols)) for j in range(i+1, min(i+3, len(numeric_cols)))] # Limit combinations for col1, col2 in col_pairs: if col1 != target_column and col2 != target_column: engineered_features[f'{col1}_{col2}_interaction'] = df[col1] * df[col2] engineered_features[f'{col1}_{col2}_ratio'] = df[col1] / (df[col2] + 1e-8) return pd.DataFrame(engineered_features, index=df.index) def automated_ml_pipeline(self, df, target_column, problem_type='auto'): """Enterprise AutoML pipeline""" results = { 'preprocessing': {}, 'feature_selection': {}, 'models': {}, 'best_model': {}, 'predictions': {}, 'feature_importance': {} } # Determine problem type if problem_type == 'auto': if df[target_column].dtype in ['object', 'category'] or df[target_column].nunique() < 10: problem_type = 'classification' else: problem_type = 'regression' # Preprocessing feature_cols = [col for col in df.columns if col != target_column] X = df[feature_cols].copy() y = df[target_column].copy() # Handle missing values X_numeric = X.select_dtypes(include=[np.number]) X_categorical = X.select_dtypes(include=['object']) if not X_numeric.empty: X_numeric = X_numeric.fillna(X_numeric.median()) if not X_categorical.empty: X_categorical = X_categorical.fillna(X_categorical.mode().iloc[0] if not X_categorical.mode().empty else 'Unknown') # Encode categorical variables if not X_categorical.empty: le = LabelEncoder() for col in X_categorical.columns: X_categorical[col] = le.fit_transform(X_categorical[col].astype(str)) X_processed = pd.concat([X_numeric, X_categorical], axis=1) # Handle target variable for classification if problem_type == 'classification' and y.dtype == 'object': le_target = LabelEncoder() y = le_target.fit_transform(y) # Feature selection if len(X_processed.columns) > 10: selector = SelectKBest(f_regression, k=min(10, len(X_processed.columns))) X_selected = selector.fit_transform(X_processed, y) selected_features = X_processed.columns[selector.get_support()].tolist() X_processed = pd.DataFrame(X_selected, columns=selected_features) results['feature_selection']['selected_features'] = selected_features # Split data X_train, X_test, y_train, y_test = train_test_split( X_processed, y, test_size=0.2, random_state=42 ) # Scale features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Model selection based on problem type if problem_type == 'regression': models = { 'Linear Regression': LinearRegression(), 'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42), 'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42), 'Ridge Regression': Ridge() } else: models = { 'Logistic Regression': LogisticRegression(random_state=42), 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), 'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42) } # Train and evaluate models best_score = -np.inf if problem_type == 'regression' else 0 best_model_name = None for name, model in models.items(): try: # Cross-validation if problem_type == 'regression': scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2') score = scores.mean() else: scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='accuracy') score = scores.mean() # Train final model model.fit(X_train_scaled, y_train) y_pred = model.predict(X_test_scaled) if problem_type == 'regression': test_score = r2_score(y_test, y_pred) mse = mean_squared_error(y_test, y_pred) results['models'][name] = { 'cv_score': score, 'test_r2': test_score, 'test_mse': mse, 'predictions': y_pred.tolist() } else: test_score = model.score(X_test_scaled, y_test) results['models'][name] = { 'cv_score': score, 'test_accuracy': test_score, 'predictions': y_pred.tolist() } # Track best model if score > best_score: best_score = score best_model_name = name # Feature importance if hasattr(model, 'feature_importances_'): importance = dict(zip(X_processed.columns, model.feature_importances_)) results['feature_importance'] = dict(sorted(importance.items(), key=lambda x: x[1], reverse=True)) except Exception as e: logger.error(f"Error training {name}: {str(e)}") continue results['best_model'] = { 'name': best_model_name, 'score': best_score, 'problem_type': problem_type } results['preprocessing'] = { 'numeric_features': X_numeric.columns.tolist() if not X_numeric.empty else [], 'categorical_features': X_categorical.columns.tolist() if not X_categorical.empty else [], 'scaling_applied': True, 'missing_values_handled': True } return results def advanced_clustering_analysis(self, df, n_clusters=None): """Enterprise clustering with multiple algorithms""" # Prepare data numeric_df = df.select_dtypes(include=[np.number]) if numeric_df.empty: raise ValueError("No numeric columns for clustering") # Handle missing values numeric_df = numeric_df.fillna(numeric_df.median()) # Scale data scaler = StandardScaler() X_scaled = scaler.fit_transform(numeric_df) results = { 'algorithms': {}, 'optimal_clusters': {}, 'silhouette_scores': {}, 'recommendations': [] } # Determine optimal number of clusters if not provided if n_clusters is None: # Elbow method for K-means inertias = [] k_range = range(2, min(11, len(numeric_df) // 2)) for k in k_range: kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) kmeans.fit(X_scaled) inertias.append(kmeans.inertia_) # Find elbow point (simplified) if len(inertias) > 2: diffs = np.diff(inertias) second_diffs = np.diff(diffs) n_clusters = k_range[np.argmax(second_diffs) + 1] if len(second_diffs) > 0 else 3 else: n_clusters = 3 # Apply multiple clustering algorithms algorithms = { 'K-Means': KMeans(n_clusters=n_clusters, random_state=42, n_init=10), 'Hierarchical': AgglomerativeClustering(n_clusters=n_clusters), 'DBSCAN': DBSCAN(eps=0.5, min_samples=5) } for name, algo in algorithms.items(): try: if name == 'DBSCAN': labels = algo.fit_predict(X_scaled) n_clusters_found = len(set(labels)) - (1 if -1 in labels else 0) else: labels = algo.fit_predict(X_scaled) n_clusters_found = n_clusters # Calculate silhouette score if len(set(labels)) > 1: from sklearn.metrics import silhouette_score sil_score = silhouette_score(X_scaled, labels) else: sil_score = 0 results['algorithms'][name] = { 'labels': labels.tolist(), 'n_clusters': n_clusters_found, 'silhouette_score': sil_score } results['silhouette_scores'][name] = sil_score except Exception as e: logger.error(f"Error in {name} clustering: {str(e)}") continue # PCA for visualization if len(numeric_df.columns) > 2: pca = PCA(n_components=2) X_pca = pca.fit_transform(X_scaled) results['pca_components'] = X_pca.tolist() results['pca_explained_variance'] = pca.explained_variance_ratio_.tolist() # Generate recommendations best_algo = max(results['silhouette_scores'].items(), key=lambda x: x[1])[0] results['recommendations'].append({ 'type': 'clustering', 'message': f"Best clustering algorithm: {best_algo} with silhouette score: {results['silhouette_scores'][best_algo]:.3f}", 'optimal_clusters': results['algorithms'][best_algo]['n_clusters'] }) return results def time_series_analysis(self, df, date_column, value_column): """Advanced time series analysis""" # Convert date column df[date_column] = pd.to_datetime(df[date_column]) df = df.sort_values(date_column) # Set date as index ts_df = df.set_index(date_column)[value_column] results = { 'trend_analysis': {}, 'seasonality': {}, 'forecasting': {}, 'anomalies': {}, 'statistics': {} } # Basic statistics results['statistics'] = { 'mean': ts_df.mean(), 'std': ts_df.std(), 'min': ts_df.min(), 'max': ts_df.max(), 'trend': 'increasing' if ts_df.iloc[-1] > ts_df.iloc[0] else 'decreasing' } # Trend analysis using linear regression X = np.arange(len(ts_df)).reshape(-1, 1) y = ts_df.values lr = LinearRegression() lr.fit(X, y) trend_slope = lr.coef_[0] results['trend_analysis'] = { 'slope': trend_slope, 'direction': 'increasing' if trend_slope > 0 else 'decreasing', 'strength': abs(trend_slope) } # Simple anomaly detection using IQR Q1 = ts_df.quantile(0.25) Q3 = ts_df.quantile(0.75) IQR = Q3 - Q1 anomalies = ts_df[(ts_df < Q1 - 1.5 * IQR) | (ts_df > Q3 + 1.5 * IQR)] results['anomalies'] = { 'count': len(anomalies), 'dates': anomalies.index.strftime('%Y-%m-%d').tolist(), 'values': anomalies.values.tolist() } # Simple forecasting (moving average) window = min(7, len(ts_df) // 4) if window > 0: forecast_periods = min(10, len(ts_df) // 4) last_values = ts_df.tail(window).mean() results['forecasting'] = { 'method': 'moving_average', 'forecast_periods': forecast_periods, 'forecast_values': [last_values] * forecast_periods } return results # Initialize analytics engine analytics_engine = EnterpriseAnalytics() def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def cleanup_old_files(): """Enhanced cleanup with model cleanup""" try: # Existing cleanup logic... for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]: for root, dirs, files in os.walk(folder): for file in files: filepath = os.path.join(root, file) if get_file_age(filepath) > FILE_EXPIRY_HOURS: os.remove(filepath) logger.info(f"Cleaned up old file: {filepath}") # Clean up storage entries current_time = datetime.now() for storage in [file_storage, model_storage, analysis_history]: sessions_to_remove = [] for session_id, session_data in storage.items(): if isinstance(session_data, dict): items_to_remove = [] for item_id, item_info in session_data.items(): if 'timestamp' in item_info: item_time = datetime.fromisoformat(item_info['timestamp']) if (current_time - item_time).total_seconds() > FILE_EXPIRY_HOURS * 3600: items_to_remove.append(item_id) for item_id in items_to_remove: del session_data[item_id] if not session_data: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del storage[session_id] except Exception as e: logger.error(f"Error during cleanup: {str(e)}") def get_file_age(filepath): """Get file age in hours""" if os.path.exists(filepath): file_time = os.path.getmtime(filepath) return (time.time() - file_time) / 3600 return float('inf') def load_data_file(filepath, filename): """Enhanced data loading with more formats""" try: file_ext = filename.rsplit('.', 1)[1].lower() if file_ext == 'csv': return pd.read_csv(filepath) elif file_ext in ['xlsx', 'xls']: return pd.read_excel(filepath) elif file_ext == 'json': return pd.read_json(filepath) elif file_ext == 'parquet': return pd.read_parquet(filepath) elif file_ext == 'tsv': return pd.read_csv(filepath, sep='\t') elif file_ext == 'feather': return pd.read_feather(filepath) else: raise ValueError(f"Unsupported file format: {file_ext}") except Exception as e: raise Exception(f"Error loading file: {str(e)}") # Setup enhanced scheduler scheduler = BackgroundScheduler() scheduler.add_job(func=cleanup_old_files, trigger="interval", hours=1) scheduler.start() atexit.register(lambda: scheduler.shutdown()) # API Endpoints @app.route('/api/health', methods=['GET']) def health_check(): return jsonify({ 'status': 'healthy', 'version': '2.0.0-enterprise', 'features': ['advanced_profiling', 'automl', 'clustering', 'time_series'], 'timestamp': datetime.now().isoformat() }) @app.route('/api/upload', methods=['POST']) def upload_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] session_id = request.form.get('sessionId') if not session_id: return jsonify({'error': 'Session ID required'}), 400 if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'File type not supported'}), 400 # Check file size file.seek(0, 2) file_size = file.tell() file.seek(0) if file_size > MAX_FILE_SIZE: return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400 # Generate unique file ID and secure filename file_id = str(uuid.uuid4()) filename = secure_filename(file.filename) # Create session directory session_dir = os.path.join(UPLOAD_FOLDER, session_id) os.makedirs(session_dir, exist_ok=True) # Save file filepath = os.path.join(session_dir, f"{file_id}_{filename}") file.save(filepath) # Enhanced file metadata if session_id not in file_storage: file_storage[session_id] = {} file_storage[session_id][file_id] = { 'filename': filename, 'filepath': filepath, 'size': file_size, 'timestamp': datetime.now().isoformat(), 'format': filename.rsplit('.', 1)[1].lower(), 'status': 'uploaded' } return jsonify({ 'fileId': file_id, 'filename': filename, 'size': file_size, 'format': filename.rsplit('.', 1)[1].lower(), 'message': 'File uploaded successfully' }) except Exception as e: logger.error(f"Upload error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/profile/', methods=['GET']) def profile_data(file_id): """Advanced data profiling endpoint""" try: session_id = request.args.get('sessionId') if not session_id or session_id not in file_storage: return jsonify({'error': 'Invalid session'}), 400 if file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Perform advanced profiling profile = analytics_engine.advanced_data_profiling(df) return jsonify(profile) except Exception as e: logger.error(f"Profiling error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/automl', methods=['POST']) def run_automl(): """Automated ML pipeline endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') target_column = data.get('targetColumn') problem_type = data.get('problemType', 'auto') if not all([session_id, file_id, target_column]): return jsonify({'error': 'Session ID, File ID, and target column required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) if target_column not in df.columns: return jsonify({'error': f'Target column {target_column} not found'}), 400 # Run AutoML pipeline results = analytics_engine.automated_ml_pipeline(df, target_column, problem_type) # Save results result_id = str(uuid.uuid4()) result_dir = os.path.join(PROCESSED_FOLDER, session_id) os.makedirs(result_dir, exist_ok=True) result_filepath = os.path.join(result_dir, f"{result_id}_automl.json") with open(result_filepath, 'w') as f: json.dump(results, f, indent=2, default=str) return jsonify({ 'resultId': result_id, 'results': results, 'analysisType': 'automl', 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"AutoML error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/clustering', methods=['POST']) def run_clustering(): """Advanced clustering analysis endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') n_clusters = data.get('nClusters') if not all([session_id, file_id]): return jsonify({'error': 'Session ID and File ID required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Run clustering analysis results = analytics_engine.advanced_clustering_analysis(df, n_clusters) # Save results result_id = str(uuid.uuid4()) result_dir = os.path.join(PROCESSED_FOLDER, session_id) os.makedirs(result_dir, exist_ok=True) result_filepath = os.path.join(result_dir, f"{result_id}_clustering.json") with open(result_filepath, 'w') as f: json.dump(results, f, indent=2, default=str) return jsonify({ 'resultId': result_id, 'results': results, 'analysisType': 'clustering', 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Clustering error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/timeseries', methods=['POST']) def run_timeseries(): """Time series analysis endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') date_column = data.get('dateColumn') value_column = data.get('valueColumn') if not all([session_id, file_id, date_column, value_column]): return jsonify({'error': 'Session ID, File ID, date column, and value column required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) if date_column not in df.columns or value_column not in df.columns: return jsonify({'error': 'Date or value column not found'}), 400 # Run time series analysis results = analytics_engine.time_series_analysis(df, date_column, value_column) # Save results result_id = str(uuid.uuid4()) result_dir = os.path.join(PROCESSED_FOLDER, session_id) os.makedirs(result_dir, exist_ok=True) result_filepath = os.path.join(result_dir, f"{result_id}_timeseries.json") with open(result_filepath, 'w') as f: json.dump(results, f, indent=2, default=str) return jsonify({ 'resultId': result_id, 'results': results, 'analysisType': 'timeseries', 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Time series error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/feature-engineering', methods=['POST']) def run_feature_engineering(): """Feature engineering endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') target_column = data.get('targetColumn') if not all([session_id, file_id]): return jsonify({'error': 'Session ID and File ID required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Generate engineered features engineered_df = analytics_engine.advanced_feature_engineering(df, target_column) # Save engineered dataset engineered_file_id = str(uuid.uuid4()) engineered_filepath = os.path.join( PROCESSED_FOLDER, session_id, f"{engineered_file_id}_engineered.csv" ) os.makedirs(os.path.dirname(engineered_filepath), exist_ok=True) # Combine original and engineered features combined_df = pd.concat([df, engineered_df], axis=1) combined_df.to_csv(engineered_filepath, index=False) # Store engineered file info if session_id not in file_storage: file_storage[session_id] = {} file_storage[session_id][engineered_file_id] = { 'filename': f"{file_info['filename'].split('.')[0]}_engineered.csv", 'filepath': engineered_filepath, 'size': os.path.getsize(engineered_filepath), 'timestamp': datetime.now().isoformat(), 'format': 'csv', 'status': 'engineered', 'parent_file': file_id } return jsonify({ 'engineeredFileId': engineered_file_id, 'originalFeatures': len(df.columns), 'engineeredFeatures': len(engineered_df.columns), 'totalFeatures': len(combined_df.columns), 'featureNames': engineered_df.columns.tolist(), 'message': 'Feature engineering completed successfully' }) except Exception as e: logger.error(f"Feature engineering error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/advanced-visualization', methods=['POST']) def create_advanced_visualization(): """Advanced visualization endpoint with Plotly""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') chart_type = data.get('chartType') parameters = data.get('parameters', {}) if not all([session_id, file_id, chart_type]): return jsonify({'error': 'Session ID, File ID, and chart type required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Create advanced visualizations using Plotly if chart_type == 'correlation_heatmap': numeric_df = df.select_dtypes(include=[np.number]) corr_matrix = numeric_df.corr() fig = px.imshow(corr_matrix, title='Correlation Heatmap', color_continuous_scale='RdBu_r', aspect='auto') elif chart_type == 'distribution_plots': column = parameters.get('column') if not column or column not in df.columns: return jsonify({'error': 'Column not specified or not found'}), 400 fig = px.histogram(df, x=column, title=f'Distribution of {column}', marginal='box') elif chart_type == 'scatter_matrix': columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:4]) fig = px.scatter_matrix(df[columns], title='Scatter Matrix', dimensions=columns) elif chart_type == 'parallel_coordinates': columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5]) fig = px.parallel_coordinates(df, dimensions=columns, title='Parallel Coordinates Plot') elif chart_type == 'box_plots': columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5]) fig = px.box(df[columns], title='Box Plots Comparison') elif chart_type == '3d_scatter': x_col = parameters.get('x_column') y_col = parameters.get('y_column') z_col = parameters.get('z_column') if not all([x_col, y_col, z_col]): return jsonify({'error': '3D scatter requires x, y, and z columns'}), 400 fig = px.scatter_3d(df, x=x_col, y=y_col, z=z_col, title=f'3D Scatter: {x_col} vs {y_col} vs {z_col}') else: return jsonify({'error': 'Unsupported chart type'}), 400 # Convert to JSON chart_json = json.dumps(fig, cls=PlotlyJSONEncoder) return jsonify({ 'chart': chart_json, 'chartType': chart_type, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Visualization error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/data-quality', methods=['POST']) def assess_data_quality(): """Data quality assessment endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') if not all([session_id, file_id]): return jsonify({'error': 'Session ID and File ID required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) quality_report = { 'overall_score': 0, 'dimensions': { 'completeness': {}, 'consistency': {}, 'validity': {}, 'uniqueness': {}, 'accuracy': {} }, 'issues': [], 'recommendations': [] } # Completeness assessment total_cells = len(df) * len(df.columns) missing_cells = df.isnull().sum().sum() completeness_score = ((total_cells - missing_cells) / total_cells) * 100 quality_report['dimensions']['completeness'] = { 'score': completeness_score, 'missing_values': df.isnull().sum().to_dict(), 'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict() } # Consistency assessment consistency_issues = [] for col in df.select_dtypes(include=['object']): # Check for inconsistent formatting values = df[col].dropna().astype(str) if len(values) > 0: # Mixed case issues lowercase_values = set(v.lower() for v in values) if len(lowercase_values) != len(set(values)): consistency_issues.append(f"Column '{col}' has mixed case values") # Leading/trailing spaces if any(v != v.strip() for v in values): consistency_issues.append(f"Column '{col}' has leading/trailing spaces") consistency_score = max(0, 100 - len(consistency_issues) * 10) quality_report['dimensions']['consistency'] = { 'score': consistency_score, 'issues': consistency_issues } # Validity assessment (basic data type validation) validity_issues = [] for col in df.columns: if df[col].dtype == 'object': # Check for potential numeric columns stored as strings try: pd.to_numeric(df[col].dropna(), errors='raise') validity_issues.append(f"Column '{col}' appears to be numeric but stored as text") except: pass validity_score = max(0, 100 - len(validity_issues) * 15) quality_report['dimensions']['validity'] = { 'score': validity_score, 'issues': validity_issues } # Uniqueness assessment uniqueness_scores = {} for col in df.columns: unique_ratio = df[col].nunique() / len(df) if len(df) > 0 else 0 uniqueness_scores[col] = unique_ratio * 100 avg_uniqueness = np.mean(list(uniqueness_scores.values())) quality_report['dimensions']['uniqueness'] = { 'score': avg_uniqueness, 'column_scores': uniqueness_scores, 'duplicate_rows': df.duplicated().sum() } # Overall score calculation dimension_scores = [ completeness_score, consistency_score, validity_score, avg_uniqueness ] quality_report['overall_score'] = np.mean(dimension_scores) # Generate recommendations if completeness_score < 80: quality_report['recommendations'].append({ 'type': 'completeness', 'priority': 'high', 'message': 'Consider imputing missing values or removing incomplete records' }) if consistency_score < 70: quality_report['recommendations'].append({ 'type': 'consistency', 'priority': 'medium', 'message': 'Standardize text formatting and remove extra spaces' }) if validity_score < 80: quality_report['recommendations'].append({ 'type': 'validity', 'priority': 'medium', 'message': 'Review data types and convert where appropriate' }) return jsonify(quality_report) except Exception as e: logger.error(f"Data quality error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/statistical-tests', methods=['POST']) def run_statistical_tests(): """Statistical hypothesis testing endpoint""" try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') test_type = data.get('testType') parameters = data.get('parameters', {}) if not all([session_id, file_id, test_type]): return jsonify({'error': 'Session ID, File ID, and test type required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) results = {'test_type': test_type, 'results': {}} if test_type == 'normality': column = parameters.get('column') if not column or column not in df.columns: return jsonify({'error': 'Column not specified or not found'}), 400 data_col = df[column].dropna() # Shapiro-Wilk test shapiro_stat, shapiro_p = stats.shapiro(data_col.sample(min(5000, len(data_col)))) # Anderson-Darling test anderson_result = stats.anderson(data_col) results['results'] = { 'shapiro_wilk': { 'statistic': shapiro_stat, 'p_value': shapiro_p, 'is_normal': shapiro_p > 0.05 }, 'anderson_darling': { 'statistic': anderson_result.statistic, 'critical_values': anderson_result.critical_values.tolist(), 'significance_levels': anderson_result.significance_level.tolist() } } elif test_type == 'correlation_significance': col1 = parameters.get('column1') col2 = parameters.get('column2') if not all([col1, col2]) or col1 not in df.columns or col2 not in df.columns: return jsonify({'error': 'Both columns must be specified and exist'}), 400 # Pearson correlation pearson_corr, pearson_p = stats.pearsonr(df[col1].dropna(), df[col2].dropna()) # Spearman correlation spearman_corr, spearman_p = stats.spearmanr(df[col1].dropna(), df[col2].dropna()) results['results'] = { 'pearson': { 'correlation': pearson_corr, 'p_value': pearson_p, 'significant': pearson_p < 0.05 }, 'spearman': { 'correlation': spearman_corr, 'p_value': spearman_p, 'significant': spearman_p < 0.05 } } elif test_type == 'group_comparison': group_col = parameters.get('groupColumn') value_col = parameters.get('valueColumn') if not all([group_col, value_col]): return jsonify({'error': 'Group and value columns required'}), 400 groups = [group for name, group in df.groupby(group_col)[value_col] if len(group) > 1] if len(groups) == 2: # Two-sample t-test t_stat, t_p = stats.ttest_ind(groups[0], groups[1]) # Mann-Whitney U test u_stat, u_p = stats.mannwhitneyu(groups[0], groups[1]) results['results'] = { 'two_sample_ttest': { 'statistic': t_stat, 'p_value': t_p, 'significant': t_p < 0.05 }, 'mann_whitney_u': { 'statistic': u_stat, 'p_value': u_p, 'significant': u_p < 0.05 } } elif len(groups) > 2: # ANOVA f_stat, f_p = stats.f_oneway(*groups) # Kruskal-Wallis test h_stat, h_p = stats.kruskal(*groups) results['results'] = { 'anova': { 'statistic': f_stat, 'p_value': f_p, 'significant': f_p < 0.05 }, 'kruskal_wallis': { 'statistic': h_stat, 'p_value': h_p, 'significant': h_p < 0.05 } } else: return jsonify({'error': 'Unsupported test type'}), 400 return jsonify(results) except Exception as e: logger.error(f"Statistical test error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/analysis-history/', methods=['GET']) def get_analysis_history(session_id): """Get analysis history for a session""" try: if session_id not in analysis_history: return jsonify({'history': []}) return jsonify({'history': list(analysis_history[session_id].values())}) except Exception as e: logger.error(f"History error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/export-report', methods=['POST']) def export_analysis_report(): """Export comprehensive analysis report""" try: data = request.get_json() session_id = data.get('sessionId') analyses = data.get('analyses', []) # List of analysis result IDs if not session_id: return jsonify({'error': 'Session ID required'}), 400 # Compile report report = { 'session_id': session_id, 'generated_at': datetime.now().isoformat(), 'analyses': [], 'summary': { 'total_analyses': len(analyses), 'data_files_processed': len(file_storage.get(session_id, {})), 'recommendations': [] } } # Load each analysis result for analysis_id in analyses: try: result_files = [ f for f in os.listdir(os.path.join(PROCESSED_FOLDER, session_id)) if f.startswith(analysis_id) ] if result_files: filepath = os.path.join(PROCESSED_FOLDER, session_id, result_files[0]) with open(filepath, 'r') as f: analysis_data = json.load(f) report['analyses'].append({ 'id': analysis_id, 'type': result_files[0].split('_')[1].split('.')[0], 'data': analysis_data }) except Exception as e: logger.error(f"Error loading analysis {analysis_id}: {str(e)}") continue # Generate summary recommendations if report['analyses']: report['summary']['recommendations'] = [ "Review data quality scores and address high-priority issues", "Consider feature engineering for improved model performance", "Validate statistical assumptions before drawing conclusions", "Monitor model performance with cross-validation results" ] # Save report report_id = str(uuid.uuid4()) report_dir = os.path.join(PROCESSED_FOLDER, session_id) os.makedirs(report_dir, exist_ok=True) report_filepath = os.path.join(report_dir, f"{report_id}_report.json") with open(report_filepath, 'w') as f: json.dump(report, f, indent=2, default=str) return jsonify({ 'reportId': report_id, 'message': 'Report generated successfully', 'downloadUrl': f'/api/download/{report_id}?sessionId={session_id}&format=json' }) except Exception as e: logger.error(f"Report export error: {str(e)}") return jsonify({'error': str(e)}), 500 # Update existing endpoints with enhanced functionality @app.route('/api/preview/', methods=['GET']) def preview_file(file_id): try: session_id = request.args.get('sessionId') if not session_id or session_id not in file_storage: return jsonify({'error': 'Invalid session'}), 400 if file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Enhanced preview with data insights preview_data = { 'basic_info': { 'columns': df.columns.tolist(), 'dtypes': df.dtypes.astype(str).to_dict(), 'shape': df.shape, 'memory_usage': df.memory_usage(deep=True).sum() }, 'sample_data': { 'head': df.head(5).to_dict('records'), 'tail': df.tail(5).to_dict('records') }, 'data_quality': { 'missing_values': df.isnull().sum().to_dict(), 'duplicate_rows': df.duplicated().sum(), 'unique_values': df.nunique().to_dict() }, 'quick_stats': {} } # Quick statistics for numeric columns numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: preview_data['quick_stats']['numeric'] = df[numeric_cols].describe().to_dict() # Quick statistics for categorical columns categorical_cols = df.select_dtypes(include=['object']).columns if len(categorical_cols) > 0: preview_data['quick_stats']['categorical'] = {} for col in categorical_cols[:5]: # Limit to first 5 categorical columns preview_data['quick_stats']['categorical'][col] = { 'top_values': df[col].value_counts().head(5).to_dict() } return jsonify(preview_data) except Exception as e: logger.error(f"Preview error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/', methods=['GET']) def home(): return jsonify({ 'message': 'Enterprise Data Analytics Platform', 'version': '2.0.0-enterprise', 'features': { 'core': ['data_profiling', 'quality_assessment', 'statistical_tests'], 'machine_learning': ['automl', 'clustering', 'feature_engineering'], 'time_series': ['trend_analysis', 'forecasting', 'anomaly_detection'], 'visualization': ['advanced_charts', 'interactive_plots', 'correlation_heatmaps'], 'enterprise': ['report_generation', 'analysis_history', 'data_governance'] }, 'endpoints': { 'data_management': ['/api/upload', '/api/preview/', '/api/profile/'], 'analytics': ['/api/automl', '/api/clustering', '/api/timeseries'], 'quality': ['/api/data-quality', '/api/statistical-tests'], 'visualization': ['/api/advanced-visualization'], 'enterprise': ['/api/export-report', '/api/analysis-history/'] }, 'timestamp': datetime.now().isoformat() }) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False) # Production ready