Data-Vision / app.py
CosmickVisions's picture
Update app.py
4158262 verified
raw
history blame
68.4 kB
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
from scipy import stats
import plotly.colors as pc
import joblib
from io import StringIO
import requests
import asyncio
from io import BytesIO
import base64
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, callbacks
from tensorflow.keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense
import mimetypes
import tensorflow
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score, learning_curve
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVR, SVC
from sklearn.feature_selection import SelectKBest
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, accuracy_score, precision_score, recall_score, f1_score
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.preprocessing import RobustScaler, StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from ydata_profiling import ProfileReport
from streamlit_pandas_profiling import st_profile_report
# Enhanced configuration
st.set_page_config(
page_title="Executive Insights Pro",
layout="wide",
page_icon="📈",
initial_sidebar_state="expanded"
)
# Initial session state setup (at the top of your script)
if 'raw_data' not in st.session_state:
st.session_state.raw_data = None
if 'cleaned_data' not in st.session_state:
st.session_state.cleaned_data = None
# Security: Set allowed file types
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'parquet', 'feather'}
MAX_FILE_SIZE_MB = 250 # 250MB limit
def validate_file(file):
"""Comprehensive file validation"""
if not file:
return False, "No file uploaded"
extension = file.name.split('.')[-1].lower()
if extension not in ALLOWED_EXTENSIONS:
return False, f"Unsupported file type: {extension}"
file_size_mb = file.size / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
return False, f"File size exceeds {MAX_FILE_SIZE_MB}MB limit"
return True, ""
@st.cache_data(ttl=3600, show_spinner="Analyzing data quality...")
def enhanced_quality_report(df):
"""Generate comprehensive data quality report"""
report = {
'basic_stats': {
'rows': df.shape[0],
'columns': df.shape[1],
'missing_values': df.isna().sum().sum(),
'duplicates': df.duplicated().sum()
},
'column_analysis': {},
'data_health_score': 100 # Starting score
}
for col in df.columns:
col_report = {
'type': str(df[col].dtype),
'unique': df[col].nunique(),
'missing': df[col].isna().sum(),
'samples': df[col].dropna().sample(3).tolist() if df[col].dtype == 'object' else []
}
# Numeric specific checks
if pd.api.types.is_numeric_dtype(df[col]):
col_report.update({
'mean': df[col].mean(),
'std': df[col].std(),
'zeros': (df[col] == 0).sum(),
'negatives': (df[col] < 0).sum() if df[col].dtype != 'uint' else 0,
'outliers': detect_outliers(df[col])
})
report['data_health_score'] -= 2 # Deduct 2% per numeric column
# Categorical specific checks
if pd.api.types.is_string_dtype(df[col]):
col_report.update({
'top_value': df[col].mode()[0] if not df[col].empty else None,
'top_freq': df[col].value_counts().iloc[0]/len(df) if not df[col].empty else 0
})
report['data_health_score'] -= 1 # Deduct 1% per string column
report['column_analysis'][col] = col_report
report['data_health_score'] = max(report['data_health_score'], 0)
return report
def detect_outliers(series):
"""Detect outliers using IQR method"""
q1 = series.quantile(0.25)
q3 = series.quantile(0.75)
iqr = q3 - q1
return ((series < (q1 - 1.5 * iqr)) | (series > (q3 + 1.5 * iqr))).sum()
# Define app_mode for navigation
app_mode = st.sidebar.selectbox(
"Select Page",
["Data Upload", "Smart Cleaning", "Advanced EDA", "Model Training", "Insights", "Predictions", "Neural Network Studio"],
help="Choose the section to navigate to."
)
# --- Data Upload Page ---
if app_mode == "Data Upload":
st.title("📤 Smart Data Hub")
st.markdown("""
**Upload your dataset** (CSV, Excel, Parquet) for comprehensive analysis.
Get instant data health insights and quality assessment.
""")
# File upload with enhanced UI
uploaded_file = st.file_uploader(
"Drag & drop or browse files",
type=list(ALLOWED_EXTENSIONS),
help=f"Max file size: {MAX_FILE_SIZE_MB}MB. Supported formats: {', '.join(ALLOWED_EXTENSIONS)}"
)
if uploaded_file:
# Validate file
is_valid, message = validate_file(uploaded_file)
if not is_valid:
st.error(f"Upload error: {message}")
st.stop()
# Load data with progress
with st.spinner(f"Loading {uploaded_file.name}..."):
try:
if uploaded_file.name.endswith('.csv'):
df = pd.read_csv(uploaded_file, low_memory=False)
elif uploaded_file.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(uploaded_file)
elif uploaded_file.name.endswith('.parquet'):
df = pd.read_parquet(uploaded_file)
elif uploaded_file.name.endswith('.feather'):
df = pd.read_feather(uploaded_file)
st.session_state.raw_data = df
st.success("Dataset loaded successfully!")
except Exception as e:
st.error(f"Error loading file: {str(e)}")
st.stop()
# In your Data Upload section, add this when new data is uploaded
if uploaded_file is not None:
# Reset models when new data is uploaded
st.session_state.model = None
st.session_state.preprocessor = None
# Data Health Dashboard
st.subheader("📊 Data Health Dashboard")
report = enhanced_quality_report(df)
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total Rows", report['basic_stats']['rows'])
col2.metric("Total Columns", report['basic_stats']['columns'])
col3.metric("Missing Values", report['basic_stats']['missing_values'])
col4.metric("Data Health Score", f"{report['data_health_score']}/100")
# Column Explorer
with st.expander("🔍 Deep Column Analysis", expanded=True):
selected_col = st.selectbox("Select column to inspect", df.columns)
col_info = report['column_analysis'][selected_col]
st.write(f"**Type:** {col_info['type']}")
st.write(f"**Unique Values:** {col_info['unique']}")
st.write(f"**Missing Values:** {col_info['missing']} ({col_info['missing']/len(df):.1%})")
if pd.api.types.is_numeric_dtype(df[selected_col]):
st.write("**Distribution:**")
st.line_chart(df[selected_col])
st.write(f"**Outliers Detected:** {col_info['outliers']}")
else:
st.write("**Most Common Values:**")
top_values = df[selected_col].value_counts().head(5)
st.bar_chart(top_values)
# Smart Recommendations
with st.expander("💡 Cleaning Recommendations"):
recommendations = []
if report['basic_stats']['duplicates'] > 0:
recommendations.append(f"🚨 Remove {report['basic_stats']['duplicates']} duplicate rows")
if report['basic_stats']['missing_values'] > 0:
recommendations.append("🔧 Apply advanced imputation strategies")
for col, data in report['column_analysis'].items():
if data['missing'] > 0.5 * len(df):
recommendations.append(f"⚠️ Consider dropping {col} (>{50}% missing)")
if data['unique'] == len(df):
recommendations.append(f"🔍 Investigate {col} - potential unique identifier")
if recommendations:
st.write("### Recommended Actions")
for rec in recommendations[:5]: # Show top 5
st.write(f"- {rec}")
else:
st.success("No critical issues detected - your data looks healthy!")
# Data Preview
with st.expander("🔎 Data Preview", expanded=True):
preview_size = st.slider("Preview rows", 5, 100, 15)
st.dataframe(df.head(preview_size).style.highlight_null(color='#FF6666'))
# Advanced Profiling
if st.button("🚀 Generate Full Data Profile"):
with st.spinner("Generating comprehensive report..."):
pr = ProfileReport(df, explorative=True)
st_profile_report(pr)
elif app_mode == "Smart Cleaning":
st.title("🧼 Intelligent Data Cleaning")
st.markdown("""
**Automated Data Cleaning** with smart suggestions and advanced transformations.
Clean your data with confidence using AI-powered recommendations.
""")
if 'raw_data' not in st.session_state or st.session_state.raw_data is None:
st.warning("Please upload your data in the Data Upload section first.")
st.stop()
# Initialize versioning
if 'data_versions' not in st.session_state:
st.session_state.data_versions = [st.session_state.raw_data.copy()]
st.session_state.current_version = 0
def update_version(new_df):
st.session_state.data_versions = st.session_state.data_versions[:st.session_state.current_version+1]
st.session_state.data_versions.append(new_df.copy())
st.session_state.current_version += 1
df = st.session_state.data_versions[st.session_state.current_version].copy()
cleaning_actions = st.session_state.get('cleaning_actions', [])
# Version Control with Progress Bar
with st.expander("⏪ Version Control", expanded=True):
st.caption(f"Current Version: {st.session_state.current_version+1}/{len(st.session_state.data_versions)}")
progress = (st.session_state.current_version + 1) / len(st.session_state.data_versions)
st.progress(progress)
col1, col2 = st.columns(2)
with col1:
if st.button("⏮️ Undo Last Action", disabled=st.session_state.current_version == 0):
st.session_state.current_version -= 1
st.experimental_rerun()
with col2:
if st.button("⏭️ Redo Next Action", disabled=st.session_state.current_version == len(st.session_state.data_versions)-1):
st.session_state.current_version += 1
st.experimental_rerun()
# Data Health Dashboard with Cards
st.subheader("📊 Data Health Dashboard")
with st.expander("Show Comprehensive Data Report", expanded=True):
from pandas_profiling import ProfileReport
pr = ProfileReport(df, explorative=True)
st_profile_report(pr)
# Enhanced Health Summary with Cards
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Rows", len(df), help="Number of rows in the dataset")
with col2:
st.metric("Total Columns", len(df.columns), help="Number of columns in the dataset")
with col3:
missing_pct = df.isna().mean().mean()
st.metric("Missing Values", f"{missing_pct:.1%}", help="Percentage of missing values in the dataset")
with col4:
duplicates = df.duplicated().sum()
st.metric("Duplicates", duplicates, help="Number of duplicate rows in the dataset")
# Visualizations for Data Health
st.markdown("### 📈 Data Health Visualizations")
col1, col2 = st.columns(2)
with col1:
st.plotly_chart(px.bar(df.isna().sum(), title="Missing Values per Column",
labels={'index': 'Column', 'value': 'Missing Count'},
color=df.isna().sum(), color_continuous_scale="Bluered"))
with col2:
st.plotly_chart(px.pie(values=df.dtypes.value_counts(), names=df.dtypes.value_counts().index,
title="Data Type Distribution", hole=0.3))
# Cleaning Operations with Tabs
st.subheader("🔧 Cleaning Operations")
tab1, tab2, tab3, tab4 = st.tabs(["Missing Values", "Duplicates", "Data Types", "Outliers"])
# 1. Missing Value Handling
with tab1:
st.markdown("### 🕳️ Handle Missing Values")
missing_cols = df.columns[df.isna().any()].tolist()
if missing_cols:
st.write("Columns with missing values:")
cols = st.multiselect("Select columns to clean", missing_cols, default=missing_cols)
method = st.radio("Imputation Method", [
"Drop Missing",
"Mean/Median/Mode",
"KNN Imputation",
"MICE Imputation",
"Deep Learning Imputation"
], horizontal=True)
if st.button(f"Apply {method}"):
try:
original_df = df.copy()
# Imputation logic here...
cleaning_actions.append(f"Applied {method} on {cols}")
update_version(df)
st.success(f"{method} applied successfully! ✅")
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.success("✨ No missing values found!")
# 2. Duplicate Handling
with tab2:
st.markdown("### 🔄 Handle Duplicates")
duplicates = df.duplicated().sum()
if duplicates > 0:
st.plotly_chart(px.histogram(df, x=df.duplicated(), title="Duplicate Distribution"))
dup_strategy = st.radio("Duplicate Strategy", [
"Remove All Duplicates",
"Keep First Occurrence",
"Keep Last Occurrence"
])
if st.button("Handle Duplicates"):
original_count = len(df)
df = df.drop_duplicates(keep={
"Remove All Duplicates": False,
"Keep First Occurrence": 'first',
"Keep Last Occurrence": 'last'
}[dup_strategy])
cleaning_actions.append(f"Removed {original_count - len(df)} duplicates")
update_version(df)
st.success(f"Removed {original_count - len(df)} duplicates! ✅")
else:
st.success("✨ No duplicates found!")
# 3. Data Type Conversion
with tab3:
st.markdown("### 🔄 Convert Data Types")
col1, col2 = st.columns(2)
with col1:
st.dataframe(df.dtypes.reset_index().rename(columns={0: 'Type', 'index': 'Column'}))
with col2:
col_to_convert = st.selectbox("Select column to convert", df.columns)
new_type = st.selectbox("New Data Type", [
"String", "Integer", "Float",
"Boolean", "Datetime", "Category"
])
if st.button("Convert Data Type"):
try:
# Conversion logic here...
cleaning_actions.append(f"Converted {col_to_convert} to {new_type}")
update_version(df)
st.success("Data type converted successfully! ✅")
except Exception as e:
st.error(f"Conversion failed: {str(e)}")
# 4. Outlier Handling
with tab4:
st.markdown("### 📈 Handle Outliers")
numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
if numeric_cols:
outlier_col = st.selectbox("Select numeric column", numeric_cols)
st.plotly_chart(px.box(df, y=outlier_col, title="Outlier Distribution"))
if st.button("Remove Outliers"):
# Outlier removal logic here...
cleaning_actions.append(f"Removed outliers from {outlier_col}")
update_version(df)
st.success("Outliers removed successfully! ✅")
else:
st.info("ℹ️ No numeric columns found for outlier detection")
# Save Cleaned Data with Enhanced Feedback
if st.button("💾 Save Cleaned Data"):
st.session_state.cleaned_data = df
st.balloons()
# Generate comprehensive report
from pandas_profiling import ProfileReport
pr = ProfileReport(df, title="Cleaned Data Report")
st_profile_report(pr)
# Show cleaning log with diffs
st.subheader("📝 Cleaning Log")
st.table(pd.DataFrame({
"Step": range(1, len(cleaning_actions)+1),
"Action": cleaning_actions
}))
# Show dataset comparison
col1, col2 = st.columns(2)
with col1:
st.write("Original Data Shape:", st.session_state.raw_data.shape)
with col2:
st.write("Cleaned Data Shape:", df.shape)
st.success("✅ Cleaned data saved successfully! You can now proceed to analysis.")
elif app_mode == "Advanced EDA":
st.title("🔍 Advanced Exploratory Data Analysis")
st.markdown("""
**Interactive Data Exploration** with optimized visualizations for fast insights.
Uncover patterns and relationships in your data with beautiful, responsive plots.
""")
if 'cleaned_data' not in st.session_state or st.session_state.cleaned_data is None:
st.warning("Please clean your data in the Smart Cleaning section first.")
st.stop()
df = st.session_state.cleaned_data.copy()
# Initialize session state for EDA configuration
if 'eda_config' not in st.session_state:
st.session_state.eda_config = {
'plot_type': "Histogram",
'x_col': df.columns[0] if len(df.columns) > 0 else None,
'y_col': df.columns[1] if len(df.columns) > 1 else None,
'z_col': df.columns[2] if len(df.columns) > 2 else None,
'color_col': None,
'facet_col': None,
'hover_data_cols': [],
'color_palette': "Viridis",
'filter_col': None,
'filter_options': []
}
# Main Layout Columns
col1, col2 = st.columns([1, 3])
with col1:
st.header("📊 Visualization Setup")
# Plot Type Selection
plot_types = {
"Distribution": ["Histogram", "Box Plot", "Violin Plot", "Density Plot"],
"Relationship": ["Scatter Plot", "Line Plot", "Heatmap", "Pair Plot"],
"Comparison": ["Bar Chart", "Pie Chart", "Parallel Coordinates"],
"3D": ["3D Scatter", "3D Surface"]
}
selected_category = st.selectbox("Plot Category", list(plot_types.keys()))
st.session_state.eda_config['plot_type'] = st.selectbox(
"Plot Type",
plot_types[selected_category]
)
# Dynamic Column Selectors
plot_type = st.session_state.eda_config['plot_type']
if plot_type in ["Histogram", "Box Plot", "Violin Plot", "Density Plot", "Bar Chart", "Pie Chart"]:
st.session_state.eda_config['x_col'] = st.selectbox(
"X Axis",
df.columns,
index=df.columns.get_loc(st.session_state.eda_config['x_col'])
if st.session_state.eda_config['x_col'] in df.columns else 0
)
if plot_type in ["Scatter Plot", "Line Plot", "Box Plot", "Violin Plot", "Density Plot"]:
st.session_state.eda_config['y_col'] = st.selectbox(
"Y Axis",
df.columns,
index=df.columns.get_loc(st.session_state.eda_config['y_col'])
if st.session_state.eda_config['y_col'] in df.columns else 0
)
if plot_type in ["3D Scatter", "3D Surface"]:
st.session_state.eda_config['z_col'] = st.selectbox(
"Z Axis",
df.columns,
index=df.columns.get_loc(st.session_state.eda_config['z_col'])
if st.session_state.eda_config['z_col'] in df.columns else 0
)
# Additional Options
with st.expander("🎨 Customization"):
st.session_state.eda_config['color_col'] = st.selectbox(
"Color By",
[None] + list(df.columns)
)
st.session_state.eda_config['facet_col'] = st.selectbox(
"Facet By",
[None] + list(df.columns)
)
st.session_state.eda_config['hover_data_cols'] = st.multiselect(
"Hover Data",
df.columns
)
st.session_state.eda_config['color_palette'] = st.selectbox(
"Color Palette",
px.colors.named_colorscales()
)
# Data Filtering
with st.expander("🔎 Data Filtering"):
filter_col = st.selectbox(
"Filter Column",
[None] + list(df.columns)
)
if filter_col:
unique_values = df[filter_col].unique()
selected_values = st.multiselect(
f"Select {filter_col} values",
unique_values,
default=unique_values
)
df = df[df[filter_col].isin(selected_values)]
with col2:
st.header("📈 Visualization")
config = st.session_state.eda_config
@st.cache_data(ttl=300)
def generate_plot(df, plot_type, config):
"""Cached plot generation function for better performance"""
try:
if plot_type == "Histogram":
return px.histogram(
df, x=config['x_col'],
color=config['color_col'],
nbins=30,
color_discrete_sequence=[config['color_palette']]
)
elif plot_type == "Scatter Plot":
return px.scatter(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col'],
hover_data=config['hover_data_cols']
)
elif plot_type == "Box Plot":
return px.box(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col']
)
elif plot_type == "Violin Plot":
return px.violin(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col'],
box=True
)
elif plot_type == "Heatmap":
numeric_df = df.select_dtypes(include=np.number)
corr = numeric_df.corr()
return px.imshow(
corr,
text_auto=True,
color_continuous_scale=config['color_palette']
)
elif plot_type == "3D Scatter":
return px.scatter_3d(
df, x=config['x_col'], y=config['y_col'], z=config['z_col'],
color=config['color_col']
)
elif plot_type == "Bar Chart":
return px.bar(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col']
)
elif plot_type == "Pie Chart":
return px.pie(
df, names=config['x_col'], values=config['y_col'],
color_discrete_sequence=[config['color_palette']]
)
elif plot_type == "Line Plot":
return px.line(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col']
)
elif plot_type == "Pair Plot":
numeric_cols = df.select_dtypes(include=np.number).columns
return px.scatter_matrix(
df[numeric_cols],
color=config['color_col']
)
elif plot_type == "Parallel Coordinates":
numeric_df = df.select_dtypes(include=np.number)
return px.parallel_coordinates(
numeric_df,
color_continuous_scale=config['color_palette']
)
elif plot_type == "Density Plot":
return px.density_contour(
df, x=config['x_col'], y=config['y_col'],
color=config['color_col']
)
except Exception as e:
st.error(f"Plot generation error: {str(e)}")
return None
# Generate and display plot
fig = generate_plot(df, plot_type, config)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Plot Statistics
with st.expander("📊 Plot Statistics"):
if plot_type in ["Histogram", "Box Plot", "Violin Plot"]:
st.write(f"**{config['x_col']} Statistics**")
st.table(df[config['x_col']].describe())
if plot_type in ["Scatter Plot", "Line Plot"]:
st.write(f"**Correlation between {config['x_col']} and {config['y_col']}**")
corr = df[[config['x_col'], config['y_col']]].corr().iloc[0,1]
st.metric("Pearson Correlation", f"{corr:.2f}")
if plot_type == "Heatmap":
st.write("**Correlation Matrix**")
numeric_df = df.select_dtypes(include=np.number)
st.dataframe(numeric_df.corr())
# Data Summary Section
st.header("📝 Data Summary")
with st.expander("Show Data Summary"):
col1, col2 = st.columns(2)
with col1:
st.write("**Data Shape**")
st.write(f"Rows: {df.shape[0]}")
st.write(f"Columns: {df.shape[1]}")
with col2:
st.write("**Data Types**")
st.dataframe(df.dtypes.reset_index().rename(columns={
'index': 'Column', 0: 'Type'
}))
st.write("**Sample Data**")
st.dataframe(df.head())
# Model Training Section
elif app_mode == "Model Training":
st.title("🚂 Model Training Studio")
st.markdown("""
**Train and Evaluate Machine Learning Models** with advanced hyperparameter tuning and performance tracking.
Choose from a wide range of algorithms and configurations.
""")
# Initialize session state variables
if 'model' not in st.session_state:
st.session_state.model = None
if 'preprocessor' not in st.session_state:
st.session_state.preprocessor = None
if 'X_train_selected' not in st.session_state:
st.session_state.X_train_selected = None
if 'X_test_selected' not in st.session_state:
st.session_state.X_test_selected = None
if 'y_train' not in st.session_state:
st.session_state.y_train = None
if 'y_test' not in st.session_state:
st.session_state.y_test = None
df = st.session_state.cleaned_data.copy()
# Target Variable Selection
st.subheader("🎯 Target Variable")
target_column = st.selectbox("Select Target Variable", df.columns, help="Choose the column to predict.")
# Problem Type Selection
st.subheader("📝 Problem Type")
problem_type = st.radio("Select Problem Type", ["Regression", "Classification"], help="Choose the type of machine learning problem.")
# Feature Selection
st.subheader("🔧 Feature Selection")
use_all_features = st.checkbox("Use All Features", value=True, help="Select to use all features for training. Deselect to manually choose features.")
if use_all_features:
feature_columns = df.drop(columns=[target_column]).columns.tolist()
else:
feature_columns = st.multiselect("Select Feature Columns", df.drop(columns=[target_column]).columns, help="Choose the features you want to use for prediction.")
# Model Selection
st.subheader("🤖 Model Selection")
if problem_type == "Regression":
model_options = ["Linear Regression", "Decision Tree", "Random Forest", "Gradient Boosting", "SVM", "Neural Network"]
else: # Classification
model_options = ["Logistic Regression", "Decision Tree", "Random Forest", "Gradient Boosting", "SVM", "Neural Network", "KNN", "Naive Bayes"]
model_name = st.selectbox("Select Model", model_options, help="Choose a model.")
# Hyperparameter Tuning
st.subheader("🎛️ Hyperparameter Tuning")
with st.expander("Configure Hyperparameters", expanded=True):
if model_name == "Random Forest":
n_estimators = st.slider("Number of Estimators", 10, 200, 100)
max_depth = st.slider("Max Depth", 3, 20, 10)
min_samples_split = st.slider("Min Samples Split", 2, 10, 2)
min_samples_leaf = st.slider("Min Samples Leaf", 1, 10, 1)
hyperparams = {
'n_estimators': n_estimators,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf
}
elif model_name == "Gradient Boosting":
learning_rate = st.slider("Learning Rate", 0.01, 1.0, 0.1)
n_estimators = st.slider("Number of Estimators", 10, 200, 100)
max_depth = st.slider("Max Depth", 3, 20, 10)
hyperparams = {
'learning_rate': learning_rate,
'n_estimators': n_estimators,
'max_depth': max_depth
}
elif model_name == "Neural Network":
hidden_layers = st.slider("Number of Hidden Layers", 1, 5, 2)
neurons_per_layer = st.slider("Neurons per Layer", 10, 200, 50)
epochs = st.slider("Epochs", 10, 200, 50)
batch_size = st.slider("Batch Size", 16, 128, 32)
hyperparams = {
'hidden_layers': hidden_layers,
'neurons_per_layer': neurons_per_layer,
'epochs': epochs,
'batch_size': batch_size
}
else:
hyperparams = {}
# Train-Test Split
st.subheader("✂️ Train-Test Split")
test_size = st.slider("Test Size", 0.1, 0.5, 0.2, help="Proportion of the dataset to include in the test split.")
# Model Training
if st.button("🚀 Train Model"):
with st.spinner("Training model..."):
try:
X = df[feature_columns]
y = df[target_column]
# Check if X is empty
if X.empty:
st.error("No features were selected. Please select feature columns.")
st.stop()
# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
# Preprocessing Pipeline
numeric_features = X.select_dtypes(include=np.number).columns
categorical_features = X.select_dtypes(exclude=np.number).columns
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
# Model Training
if model_name == "Linear Regression":
model = LinearRegression()
elif model_name == "Logistic Regression":
model = LogisticRegression(max_iter=1000)
elif model_name == "Decision Tree":
if problem_type == "Regression":
model = DecisionTreeRegressor()
else:
model = DecisionTreeClassifier()
elif model_name == "Random Forest":
if problem_type == "Regression":
model = RandomForestRegressor(**hyperparams)
else:
model = RandomForestClassifier(**hyperparams)
elif model_name == "Gradient Boosting":
if problem_type == "Regression":
model = GradientBoostingRegressor(**hyperparams)
else:
model = GradientBoostingClassifier(**hyperparams)
elif model_name == "SVM":
if problem_type == "Regression":
model = SVR()
else:
model = SVC()
elif model_name == "Neural Network":
if problem_type == "Regression":
model = MLPRegressor(
hidden_layer_sizes=[hyperparams['neurons_per_layer']] * hyperparams['hidden_layers'],
max_iter=hyperparams['epochs'],
batch_size=hyperparams['batch_size']
)
else:
model = MLPClassifier(
hidden_layer_sizes=[hyperparams['neurons_per_layer']] * hyperparams['hidden_layers'],
max_iter=hyperparams['epochs'],
batch_size=hyperparams['batch_size']
)
elif model_name == "KNN":
model = KNeighborsClassifier()
elif model_name == "Naive Bayes":
model = GaussianNB()
# Train the model
model.fit(X_train_processed, y_train)
# Store model and preprocessor
st.session_state.model = Pipeline(steps=[('preprocessor', preprocessor), ('model', model)])
st.session_state.preprocessor = preprocessor
# Store the test data
st.session_state.X_train_selected = X_train_processed
st.session_state.X_test_selected = X_test_processed
st.session_state.y_train = y_train
st.session_state.y_test = y_test
# Model Evaluation
y_pred = model.predict(X_test_processed)
if problem_type == "Regression":
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
st.write(f"Mean Squared Error: {mse:.4f}")
st.write(f"Root Mean Squared Error: {rmse:.4f}")
st.write(f"Mean Absolute Error: {mae:.4f}")
st.write(f"R-squared: {r2:.4f}")
else:
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)
st.write(f"Accuracy: {accuracy:.4f}")
st.write(f"Precision: {precision:.4f}")
st.write(f"Recall: {recall:.4f}")
st.write(f"F1 Score: {f1:.4f}")
st.write("Classification Report:")
st.text(classification_report(y_test, y_pred))
# Visualization
st.subheader("📊 Model Performance Visualization")
if problem_type == "Regression":
fig, ax = plt.subplots()
ax.scatter(y_test, y_pred)
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2)
ax.set_xlabel('Actual')
ax.set_ylabel('Predicted')
ax.set_title('Actual vs Predicted')
st.pyplot(fig)
else:
conf_matrix = confusion_matrix(y_test, y_pred)
fig, ax = plt.subplots()
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=ax)
ax.set_xlabel('Predicted Labels')
ax.set_ylabel('True Labels')
ax.set_title('Confusion Matrix')
st.pyplot(fig)
st.success("Model trained successfully!")
except Exception as e:
st.error(f"An error occurred during training: {e}")
if st.session_state.model is not None:
st.subheader("💾 Save Model")
model_filename = st.text_input("Enter Model Filename (without extension)", "trained_model")
if st.button("Save Model"):
try:
joblib.dump(st.session_state.model, f"{model_filename}.joblib")
st.success(f"Model saved as {model_filename}.joblib")
except Exception as e:
st.error(f"Error saving model: {e}")
else:
st.warning("No trained model available. Train a model first to enable saving.")
# Visualization Lab Section
elif app_mode == "Visualization Lab":
st.title("🔬 Visualization Lab")
st.markdown("""
**Explore and Visualize Your Data** with advanced plotting tools and interactive visualizations.
Uncover hidden patterns and relationships in your data.
""")
if 'cleaned_data' not in st.session_state or st.session_state.cleaned_data is None:
st.warning("Please clean your data in the Smart Cleaning section first.")
st.stop()
df = st.session_state.cleaned_data.copy()
# Visualization Type Selection
st.subheader("📊 Choose Visualization Type")
plot_types = [
"Histogram", "Scatter Plot", "Box Plot", "Violin Plot",
"Correlation Heatmap", "Parallel Coordinates", "Andrews Curves",
"Pair Plot", "Density Contour", "3D Scatter", "Time Series",
"Sunburst Chart", "Funnel Chart", "Clustering Analysis"
]
plot_type = st.selectbox("Select Visualization Type", plot_types)
# Dynamic Controls Based on Plot Type
if plot_type != "Correlation Heatmap":
x_col = st.selectbox("X Axis", df.columns)
if plot_type in ["Scatter Plot", "Box Plot", "Violin Plot", "Time Series", "3D Scatter", "Histogram"]:
y_col = st.selectbox("Y Axis", df.columns)
if plot_type == "3D Scatter":
z_col = st.selectbox("Z Axis", df.columns)
color_col = st.selectbox("Color by", [None] + list(df.columns))
# Advanced Plot Customization
with st.expander("🎨 Advanced Customization", expanded=False):
color_palette = st.selectbox("Color Palette", ["Viridis", "Plasma", "Magma", "Cividis", "RdBu", "Rainbow"])
hover_data_cols = st.multiselect("Hover Data", df.columns)
# Plot Generation
try:
fig = None
if plot_type == "Histogram":
fig = px.histogram(
df, x=x_col, y=y_col,
nbins=30, template="plotly_dark",
color_discrete_sequence=[color_palette]
)
elif plot_type == "Scatter Plot":
fig = px.scatter(
df, x=x_col, y=y_col,
color=color_col,
size=hover_data_cols,
hover_data=hover_data_cols
)
elif plot_type == "3D Scatter":
fig = px.scatter_3d(
df, x=x_col, y=y_col, z=z_col,
color=color_col,
color_discrete_sequence=[color_palette]
)
elif plot_type == "Correlation Heatmap":
numeric_df = df.select_dtypes(include=np.number)
if not numeric_df.empty:
corr = numeric_df.corr()
fig = px.imshow(
corr, text_auto=True,
color_continuous_scale=color_palette
)
else:
st.warning("No numerical columns found for correlation heatmap.")
elif plot_type == "Box Plot":
fig = px.box(
df, x=x_col, y=y_col,
color=color_col
)
elif plot_type == "Violin Plot":
fig = px.violin(
df, x=x_col, y=y_col,
box=True, points="all",
color=color_col
)
elif plot_type == "Time Series":
df = df.sort_values(by=x_col)
fig = px.line(
df, x=x_col, y=y_col,
color=color_col
)
elif plot_type == "Scatter Matrix":
fig = px.scatter_matrix(
df, dimensions=[x_col, y_col],
color=color_col
)
if fig:
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"An error occurred while generating the plot: {e}")
# Statistical Analysis Section
with st.expander("📊 Statistical Analysis", expanded=True):
analysis_type = st.selectbox("Select Analysis Type", [
"Descriptive Statistics",
"Correlation Analysis",
"Hypothesis Testing",
"Distribution Fitting"
])
if analysis_type == "Descriptive Statistics":
st.write(df.describe(include='all'))
elif analysis_type == "Correlation Analysis":
numeric_cols = df.select_dtypes(include=np.number).columns
if len(numeric_cols) >= 2:
corr_method = st.selectbox("Correlation Method", [
"Pearson", "Kendall", "Spearman"
])
corr_matrix = df[numeric_cols].corr(method=corr_method.lower())
st.write(corr_matrix)
st.heatmap(corr_matrix, annot=True, cmap=color_palette)
else:
st.warning("Need at least 2 numeric columns for correlation analysis")
elif analysis_type == "Hypothesis Testing":
test_type = st.selectbox("Select Test Type", [
"T-test", "Chi-Squared Test", "ANOVA", "Mann-Whitney U"
])
if test_type == "T-test":
col1 = st.selectbox("Column 1 (Numeric)", df.select_dtypes(include=np.number).columns)
col2 = st.selectbox("Column 2 (Categorical)", df.select_dtypes(include='object').columns)
if st.button("Run T-test"):
groups = df.groupby(col2)[col1].apply(list)
if len(groups) == 2:
t_stat, p_value = stats.ttest_ind(groups.iloc[0], groups.iloc[1])
st.write(f"T-statistic: {t_stat:.4f}")
st.write(f"P-value: {p_value:.4f}")
if p_value < 0.05:
st.write("Reject the null hypothesis.")
else:
st.write("Fail to reject the null hypothesis.")
else:
st.write("Select a categorical column with exactly two categories.")
elif analysis_type == "Distribution Fitting":
numeric_col = st.selectbox("Select Numeric Column", df.select_dtypes(include=np.number).columns)
dist_types = ["Normal", "Log-Normal", "Exponential", "Gamma"]
selected_dist = st.selectbox("Select Distribution Type", dist_types)
if st.button("Fit Distribution"):
from scipy.stats import norm, lognorm, expon, gamma
dist_functions = {
"Normal": norm,
"Log-Normal": lognorm,
"Exponential": expon,
"Gamma": gamma
}
params = dist_functions[selected_dist].fit(df[numeric_col].dropna())
st.write(f"Fitted Parameters: {params}")
# Data Profiling Section
with st.expander("📝 Generate Full Data Profile", expanded=False):
if st.button("🚀 Generate Comprehensive Report"):
with st.spinner("Generating report..."):
pr = ProfileReport(df, explorative=True)
st_profile_report(pr)
# Insights Section
elif app_mode == "Insights":
st.title("📊 Model Insights & Explainability")
st.markdown("""
**Understand and Interpret Your Model** with advanced explainability tools and visualizations.
Gain deeper insights into model behavior and predictions.
""")
if 'model' not in st.session_state or st.session_state.model is None:
st.warning("Please train a model in the Model Training section first.")
st.stop()
model = st.session_state.model.steps[-1][1] # Get the trained model
preprocessor = st.session_state.model.steps[0][1] # Get the preprocessor
# Model Summary
st.subheader("📝 Model Summary")
st.write(f"**Model Type:** {type(model).__name__}")
st.write(f"**Problem Type:** {'Regression' if hasattr(model, 'predict') else 'Classification'}")
st.write(f"**Training Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Feature Importance
st.subheader("🔍 Feature Importance")
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
feature_names = preprocessor.get_feature_names_out()
importance_df = pd.DataFrame({
'Feature': feature_names,
'Importance': importances
}).sort_values('Importance', ascending=False)
fig, ax = plt.subplots()
sns.barplot(x='Importance', y='Feature', data=importance_df.head(10), ax=ax)
ax.set_title('Top 10 Feature Importances')
st.pyplot(fig)
else:
st.info("Feature importance not available for this model type.")
# SHAP Values
st.subheader("📊 SHAP Values")
if st.checkbox("Calculate SHAP Values (Warning: May be slow for large datasets)"):
try:
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(st.session_state.X_test_selected)
# Summary Plot
st.write("### Summary Plot")
fig, ax = plt.subplots()
shap.summary_plot(shap_values, st.session_state.X_test_selected, feature_names=preprocessor.get_feature_names_out())
st.pyplot(fig)
# Force Plot for Individual Predictions
st.write("### Individual Prediction Explanation")
sample_idx = st.slider("Select Sample Index", 0, len(st.session_state.X_test_selected)-1, 0)
fig, ax = plt.subplots()
shap.force_plot(explainer.expected_value, shap_values[sample_idx], st.session_state.X_test_selected[sample_idx],
feature_names=preprocessor.get_feature_names_out(), matplotlib=True, show=False)
st.pyplot(fig)
except Exception as e:
st.error(f"SHAP calculation failed: {e}")
# Partial Dependence Plots
st.subheader("📈 Partial Dependence Plots")
if hasattr(model, 'predict'):
feature_to_plot = st.selectbox("Select Feature for PDP", preprocessor.get_feature_names_out())
if st.button("Generate PDP"):
from sklearn.inspection import PartialDependenceDisplay
fig, ax = plt.subplots()
PartialDependenceDisplay.from_estimator(
model, st.session_state.X_test_selected,
features=[feature_to_plot],
feature_names=preprocessor.get_feature_names_out(),
ax=ax
)
st.pyplot(fig)
# Model Performance Over Time
st.subheader("⏳ Model Performance Over Time")
if st.checkbox("Track Performance Over Time"):
performance_history = {
'timestamp': [],
'metric': [],
'value': []
}
if hasattr(model, 'predict'):
y_pred = model.predict(st.session_state.X_test_selected)
mse = mean_squared_error(st.session_state.y_test, y_pred)
performance_history['timestamp'].append(datetime.now())
performance_history['metric'].append('MSE')
performance_history['value'].append(mse)
performance_df = pd.DataFrame(performance_history)
st.line_chart(performance_df.set_index('timestamp'))
# Model Debugging
st.subheader("🐛 Model Debugging")
if st.checkbox("Enable Debug Mode"):
st.write("### Model Parameters")
st.json(model.get_params())
st.write("### Training Data Summary")
st.write(f"Number of Samples: {st.session_state.X_train_selected.shape[0]}")
st.write(f"Number of Features: {st.session_state.X_train_selected.shape[1]}")
# Export Insights
st.subheader("💾 Export Insights")
if st.button("Export Insights as PDF"):
try:
from fpdf import FPDF
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="Model Insights Report", ln=True, align='C')
pdf.cell(200, 10, txt=f"Model Type: {type(model).__name__}", ln=True)
pdf.cell(200, 10, txt=f"Problem Type: {'Regression' if hasattr(model, 'predict') else 'Classification'}", ln=True)
pdf.output("model_insights.pdf")
st.success("Insights exported successfully!")
except Exception as e:
st.error(f"Export failed: {e}")
# Predictions Section
elif app_mode == "Predictions":
st.title("🔮 Prediction Studio")
st.markdown("""
**Make Predictions** with your trained model and explore prediction explanations.
Generate batch predictions and export results.
""")
if 'model' not in st.session_state or st.session_state.model is None:
st.warning("Please train a model in the Model Training section first.")
st.stop()
model = st.session_state.model.steps[-1][1] # Get the trained model
preprocessor = st.session_state.model.steps[0][1] # Get the preprocessor
# Single Prediction
st.subheader("🎯 Single Prediction")
input_data = {}
feature_names = preprocessor.get_feature_names_out()
for feature in feature_names:
if feature in st.session_state.cleaned_data.columns:
if pd.api.types.is_numeric_dtype(st.session_state.cleaned_data[feature]):
input_data[feature] = st.number_input(f"Enter {feature}", value=st.session_state.cleaned_data[feature].mean())
else:
input_data[feature] = st.selectbox(f"Select {feature}", st.session_state.cleaned_data[feature].unique())
if st.button("Make Prediction"):
try:
input_df = pd.DataFrame([input_data])
input_processed = preprocessor.transform(input_df)
prediction = model.predict(input_processed)[0]
st.write(f"**Prediction:** {prediction}")
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(input_processed)[0]
st.write("**Prediction Probabilities:**")
st.bar_chart(probabilities)
# SHAP Explanation
if st.checkbox("Show SHAP Explanation"):
try:
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(input_processed)
st.write("### SHAP Values")
fig, ax = plt.subplots()
shap.force_plot(explainer.expected_value, shap_values, input_processed,
feature_names=feature_names, matplotlib=True, show=False)
st.pyplot(fig)
except Exception as e:
st.error(f"SHAP calculation failed: {e}")
except Exception as e:
st.error(f"Prediction failed: {e}")
# Batch Predictions
st.subheader("📂 Batch Predictions")
batch_file = st.file_uploader("Upload CSV for Batch Predictions", type=["csv"])
if batch_file is not None:
try:
batch_df = pd.read_csv(batch_file)
batch_processed = preprocessor.transform(batch_df)
batch_predictions = model.predict(batch_processed)
batch_df['Prediction'] = batch_predictions
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(batch_processed)
for i in range(probabilities.shape[1]):
batch_df[f'Probability_Class_{i}'] = probabilities[:, i]
st.write("### Predictions Preview")
st.dataframe(batch_df.head())
# Download Predictions
csv = batch_df.to_csv(index=False)
b64 = base64.b64encode(csv.encode()).decode()
href = f'<a href="data:file/csv;base64,{b64}" download="predictions.csv">Download Predictions CSV</a>'
st.markdown(href, unsafe_allow_html=True)
except Exception as e:
st.error(f"Batch prediction failed: {e}")
# Prediction Analysis
st.subheader("📊 Prediction Analysis")
if st.checkbox("Analyze Predictions"):
try:
y_pred = model.predict(st.session_state.X_test_selected)
y_test = st.session_state.y_test
if hasattr(model, 'predict'):
fig, ax = plt.subplots()
ax.scatter(y_test, y_pred)
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2)
ax.set_xlabel('Actual')
ax.set_ylabel('Predicted')
ax.set_title('Actual vs Predicted')
st.pyplot(fig)
else:
conf_matrix = confusion_matrix(y_test, y_pred)
fig, ax = plt.subplots()
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', ax=ax)
ax.set_xlabel('Predicted Labels')
ax.set_ylabel('True Labels')
ax.set_title('Confusion Matrix')
st.pyplot(fig)
except Exception as e:
st.error(f"Prediction analysis failed: {e}")
# Prediction Export
st.subheader("💾 Export Predictions")
if st.button("Export Predictions as PDF"):
try:
from fpdf import FPDF
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="Predictions Report", ln=True, align='C')
pdf.cell(200, 10, txt=f"Model Type: {type(model).__name__}", ln=True)
pdf.cell(200, 10, txt=f"Problem Type: {'Regression' if hasattr(model, 'predict') else 'Classification'}", ln=True)
pdf.output("predictions_report.pdf")
st.success("Predictions exported successfully!")
except Exception as e:
st.error(f"Export failed: {e}")
# Neural Network Studio Section
elif app_mode == "Neural Network Studio":
st.title("🧠 Neural Network Studio")
st.markdown("""
**Build and Train Neural Networks** with advanced configurations and visualizations.
Explore deep learning models with ease.
""")
if 'cleaned_data' not in st.session_state or st.session_state.cleaned_data is None:
st.warning("Please clean your data in the Smart Cleaning section first.")
st.stop()
df = st.session_state.cleaned_data.copy()
# Target Variable Selection
st.subheader("🎯 Target Variable")
target_column = st.selectbox("Select Target Variable", df.columns, help="Choose the column to predict.")
# Problem Type Selection
st.subheader("📝 Problem Type")
problem_type = st.radio("Select Problem Type", ["Regression", "Classification"], help="Choose the type of machine learning problem.")
# Feature Selection
st.subheader("🔧 Feature Selection")
use_all_features = st.checkbox("Use All Features", value=True, help="Select to use all features for training. Deselect to manually choose features.")
if use_all_features:
feature_columns = df.drop(columns=[target_column]).columns.tolist()
else:
feature_columns = st.multiselect("Select Feature Columns", df.drop(columns=[target_column]).columns, help="Choose the features you want to use for prediction.")
# Neural Network Configuration
st.subheader("⚙️ Neural Network Configuration")
with st.expander("Configure Neural Network", expanded=True):
col1, col2 = st.columns(2)
with col1:
hidden_layers = st.slider("Number of Hidden Layers", 1, 5, 2)
neurons_per_layer = st.slider("Neurons per Layer", 10, 200, 50)
activation = st.selectbox("Activation Function",
["relu", "tanh", "sigmoid", "selu", "swish"])
dropout_rate = st.slider("Dropout Rate", 0.0, 0.5, 0.2)
initializer = st.selectbox("Weight Initializer",
["glorot_uniform", "he_normal", "lecun_uniform"])
with col2:
learning_rate = st.slider("Learning Rate", 0.0001, 0.1, 0.001, format="%.4f")
optimizer_choice = st.selectbox("Optimizer",
["Adam", "Nadam", "RMSprop", "SGD"])
batch_norm = st.checkbox("Batch Normalization", value=True)
regularization = st.checkbox("L2 Regularization")
epochs = st.slider("Epochs", 10, 200, 50)
batch_size = st.slider("Batch Size", 16, 128, 32)
# Train-Test Split
st.subheader("✂️ Train-Test Split")
test_size = st.slider("Test Size", 0.1, 0.5, 0.2, help="Proportion of the dataset to include in the test split.")
# Model Training
if st.button("🚀 Train Neural Network"):
with st.spinner("Training neural network..."):
try:
X = df[feature_columns]
y = df[target_column]
# Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
# Preprocessing Pipeline
numeric_features = X.select_dtypes(include=np.number).columns
categorical_features = X.select_dtypes(exclude=np.number).columns
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
# Build neural network with advanced features
model = keras.Sequential()
model.add(layers.Input(shape=(X_train_processed.shape[1],)))
for _ in range(hidden_layers):
# Create configurable layers
layer_config = {
'units': neurons_per_layer,
'activation': activation,
'kernel_initializer': initializer
}
if regularization:
layer_config['kernel_regularizer'] = keras.regularizers.l2(0.01)
model.add(layers.Dense(**layer_config))
if batch_norm:
model.add(layers.BatchNormalization())
if dropout_rate > 0:
model.add(layers.Dropout(dropout_rate))
# Output layer
output_activation = 'linear' if problem_type == "Regression" else 'softmax'
output_units = 1 if problem_type == "Regression" else len(np.unique(y_train))
model.add(layers.Dense(output_units, activation=output_activation))
# Configure optimizer
optimizers = {
"Adam": keras.optimizers.Adam(learning_rate=learning_rate),
"Nadam": keras.optimizers.Nadam(learning_rate=learning_rate),
"RMSprop": keras.optimizers.RMSprop(learning_rate=learning_rate),
"SGD": keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
}
optimizer = optimizers[optimizer_choice]
# Compile the model
model.compile(optimizer=optimizer,
loss='mse' if problem_type == "Regression" else 'sparse_categorical_crossentropy',
metrics=['mae'] if problem_type == "Regression" else ['accuracy'])
# Add callbacks section
with st.expander("Advanced Training Options"):
early_stopping = st.checkbox("Early Stopping", value=True)
reduce_lr = st.checkbox("Reduce Learning Rate on Plateau")
patience = st.slider("Patience Epochs", 5, 20, 10) if early_stopping else 0
callbacks_list = []
if early_stopping:
callbacks_list.append(
callbacks.EarlyStopping(patience=patience, restore_best_weights=True))
if reduce_lr:
callbacks_list.append(
callbacks.ReduceLROnPlateau(factor=0.2, patience=patience//2))
# Train the model with callbacks
history = model.fit(
X_train_processed, y_train,
epochs=epochs,
batch_size=batch_size,
validation_split=0.2,
callbacks=callbacks_list,
verbose=0
)
# Store model and preprocessor
st.session_state.model = Pipeline(steps=[('preprocessor', preprocessor), ('model', model)])
st.session_state.preprocessor = preprocessor
# Store the test data
st.session_state.X_train_selected = X_train_processed
st.session_state.X_test_selected = X_test_processed
st.session_state.y_train = y_train
st.session_state.y_test = y_test
# Model Evaluation
y_pred = model.predict(X_test_processed)
# Post-processing for classification
if problem_type == "Classification":
y_pred = np.argmax(y_pred, axis=1) # Convert probabilities to class labels
if problem_type == "Regression":
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
st.write(f"Mean Squared Error: {mse:.4f}")
st.write(f"Root Mean Squared Error: {rmse:.4f}")
st.write(f"Mean Absolute Error: {mae:.4f}")
st.write(f"R-squared: {r2:.4f}")
else:
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)
st.write(f"Accuracy: {accuracy:.4f}")
st.write(f"Precision: {precision:.4f}")
st.write(f"Recall: {recall:.4f}")
st.write(f"F1 Score: {f1:.4f}")
st.write("Classification Report:")
st.text(classification_report(y_test, y_pred))
# Visualization with multiple metrics
st.subheader("📊 Training History")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# Plot loss
ax1.plot(history.history['loss'], label='Train Loss')
ax1.plot(history.history['val_loss'], label='Validation Loss')
ax1.set_title('Loss Evolution')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# Plot accuracy/metric
if problem_type == "Classification":
ax2.plot(history.history['accuracy'], label='Train Accuracy')
ax2.plot(history.history['val_accuracy'], label='Validation Accuracy')
ax2.set_title('Accuracy Evolution')
ax2.set_ylabel('Accuracy')
else:
ax2.plot(history.history['mae'], label='Train MAE')
ax2.plot(history.history['val_mae'], label='Validation MAE')
ax2.set_title('MAE Evolution')
ax2.set_ylabel('MAE')
ax2.set_xlabel('Epoch')
ax2.legend()
st.pyplot(fig)
st.success("Neural network trained successfully!")
except Exception as e:
st.error(f"An error occurred during training: {e}")
# Model Saving
if st.session_state.model is not None:
st.subheader("💾 Save Model")
model_filename = st.text_input("Enter Model Filename (without extension)", "neural_network")
if st.button("Save Model"):
try:
# Save the entire Keras model including architecture and weights
st.session_state.model.named_steps['model'].save(f"{model_filename}.h5") # Saves as a HDF5 file
st.success(f"Model saved as {model_filename}.h5")
except Exception as e:
st.error(f"Error saving model: {e}")