Data-Vision / app.py
CosmickVisions's picture
Update app.py
42a818a verified
raw
history blame
52 kB
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, RandomForestClassifier
from sklearn.svm import SVR, SVC
from sklearn.decomposition import PCA #Import at top
from sklearn.metrics import silhouette_score #Import at top
from sklearn.cluster import DBSCAN #Import at top
from sklearn.feature_selection import SelectKBest #Import at top
import joblib #Import at top
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.preprocessing import RobustScaler, StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from ydata_profiling import ProfileReport
from streamlit_pandas_profiling import st_profile_report
from io import StringIO
import joblib
import requests
import asyncio
from io import BytesIO
import base64
import time
from sklearn.cluster import KMeans
import scipy.stats as stats
# Configurations
st.set_page_config(page_title="Executive Insights Pro", layout="wide", page_icon="📈")
# ----Load Image----
@st.cache_data(ttl=3600)
def load_image(image_url):
"""Loads an image from a URL and returns bytes."""
try:
response = requests.get(image_url, stream=True)
response.raise_for_status()
return response.content
except requests.exceptions.RequestException as e:
st.error(f"Error loading image: {e}")
return None
# ----Function to make and convert background to base 64 code-----
def set_background():
"""Sets the background image using base64 encoding."""
image_url = "https://wallpapers.com/images/featured/skrwoybjif4j8l2j.jpg" # Corporate Image
image_data = load_image(image_url)
if image_data:
# Convert bytes to base64
image_base64 = base64.b64encode(image_data).decode()
st.markdown(
f"""
<style>
.stApp {{
background-image: url(data:image/jpeg;base64,{image_base64});
background-size: cover;
background-position: center center;
background-attachment: fixed;
}}
</style>
""",
unsafe_allow_html=True,
)
return
# Simplified CSS
def apply_simplified_theme():
"""Injects simplified CSS to enhance Streamlit's default style."""
st.markdown(
"""
<style>
[data-testid="stSidebar"] {
background-color: rgba(52, 73, 94, 0.9);
color: white;
}
.main h1, .main h2, .main h3, .main h4, .main h5, .main h6 {
color: #5396C6;
}
.st-bb, .st-ae, .st-bv {
background-color: rgba(20, 20, 30, 0.3);
box-shadow: 1px 1px 5px #4e4e4e;
}
</style>
""",
unsafe_allow_html=True,
)
return
# Apply background and simplified theme
set_background()
apply_simplified_theme()
def show_loader(message="Loading..."):
"""Displays an animated loader."""
st.markdown(
f"""
<div style="display: flex; align-items: center; justify-content: center; margin-top: 20px;">
<div class="loader"></div>
<span style="margin-left: 10px; color: #00f7ff;">{message}</span>
</div>
""",
unsafe_allow_html=True
)
@st.cache_data(ttl=3600, allow_output_mutation=True) #Added allow_output_mutation
def load_data(uploaded_file):
"""Load and cache dataset, with file type validation."""
if uploaded_file is not None:
file_extension = uploaded_file.name.split(".")[-1].lower()
mime_type = mimetypes.guess_type(uploaded_file.name)[0]
max_file_size_mb = 50 # Set a maximum file size (adjust as needed)
file_size_mb = uploaded_file.size / (1024 * 1024)
if file_size_mb > max_file_size_mb:
st.error(f"File size exceeds the limit of {max_file_size_mb} MB.")
return None
try: # Wrap file reading in a try...except
if file_extension == "csv" or mime_type == 'text/csv':
df = pd.read_csv(uploaded_file)
return df
elif file_extension in ["xlsx", "xls"] or mime_type in ['application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']:
df = pd.read_excel(uploaded_file)
return df
else:
st.error("Unsupported file type. Please upload a CSV or Excel file.")
return None
except FileNotFoundError:
st.error("File not found. Please check the file path.")
except pd.errors.ParserError: # Catch pandas-specific parsing errors
st.error("Error parsing the file. Make sure it's a valid CSV or Excel file.")
except Exception as e:
st.error(f"An unexpected error occurred: {type(e).__name__} - {str(e)}")
return None # Handle other potential exceptions
else:
return None
@st.cache_data(ttl=3600)
def generate_profile(df):
"""Generate automated EDA report"""
return ProfileReport(df, minimal=True)
# Session State Management
if 'raw_data' not in st.session_state:
st.session_state.raw_data = None
if 'cleaned_data' not in st.session_state:
st.session_state.cleaned_data = None
if 'train_test' not in st.session_state:
st.session_state.train_test = {}
if 'model' not in st.session_state:
st.session_state.model = None
if 'preprocessor' not in st.session_state:
st.session_state.preprocessor = None # to store the column transformer
# Sidebar Navigation
st.sidebar.title("🔮 Data Wizard Pro")
# Apply custom CSS to change text color in the sidebar
st.markdown(
"""
<style>
[data-testid="stSidebar"] {
color: #00f7ff; /* Cyan color for sidebar text */
}
</style>
""",
unsafe_allow_html=True,
)
# Replace the existing app_mode section with this:
app_mode = st.sidebar.radio("Navigate", [
"Data Upload",
"Smart Cleaning",
"Advanced EDA",
"Model Training",
"Predictions",
"Visualization Lab",
"Neural Network Studio" # New option
])
# --- Main App Logic ---
if app_mode == "Data Upload":
st.title("📤 Data Upload & Initial Analysis")
# File Upload Section with improved styling
st.markdown(
"""
<style>
.stFileUploader label {
color: #00f7ff !important; /* Cyan color for the label */
}
.stFileUploader div div div {
background-color: #141422 !important; /* Dark background */
color: #e0e0ff !important; /* Light text */
border: 1px solid #00f7ff !important; /* Cyan border */
border-radius: 10px;
}
</style>
""",
unsafe_allow_html=True,
)
uploaded_file = st.file_uploader(
"Choose a CSV or Excel file", type=["csv", "xlsx"],
help="Upload your dataset here. Supported formats: CSV, XLSX"
)
if uploaded_file:
df = load_data(uploaded_file)
if df is not None:
# only proceed if load_data returned a valid dataframe
st.session_state.raw_data = df
st.session_state.cleaned_data = df.copy()
st.subheader("Data Overview")
# Data Overview Cards with more context
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Number of Rows", df.shape[0], help="Total number of entries in the dataset.")
with col2:
st.metric("Number of Columns", df.shape[1], help="Total number of features in the dataset.")
with col3:
num_missing = df.isna().sum().sum()
st.metric("Total Missing Values", num_missing, help="Total number of missing entries across the entire dataset.")
# Display Data Types
st.write("Column Data Types:")
dtype_counts = df.dtypes.value_counts().to_dict()
for dtype, count in dtype_counts.items():
st.write(f"- {dtype}: {count} column(s)")
# Sample Data Table with improved display
st.subheader("Sample Data")
num_rows_preview = st.slider("Number of Rows to Preview", 5, 20, 10, help="Adjust the number of rows displayed in the sample data.")
st.dataframe(df.head(num_rows_preview), use_container_width=True)
# Column Statistics
with st.expander("📊 Column Statistics"):
for col in df.columns:
st.subheader(f"Column: {col}")
st.write(f"Data type: {df[col].dtype}")
if pd.api.types.is_numeric_dtype(df[col]):
st.write("Summary Statistics:")
st.write(df[col].describe())
else:
st.write("Value Counts:")
st.write(df[col].value_counts())
# Automated EDA Report
with st.expander("🚀 Automated Data Report"):
if st.button("Generate Smart Report"):
show_loader("Generating EDA Report")
pr = generate_profile(df)
st_profile_report(pr)
elif app_mode == "Smart Cleaning":
st.title("🧼 Intelligent Data Cleaning")
if st.session_state.raw_data is not None:
df = st.session_state.cleaned_data
# Cleaning Toolkit
col1, col2 = st.columns([1, 3])
with col1:
st.subheader("Cleaning Actions")
clean_action = st.selectbox("Choose Operation", [
"Handle Missing Values",
"Clean Text",
# ... other cleaning operations ...
])
if clean_action == "Handle Missing Values":
columns_with_missing = df.columns[df.isnull().any()].tolist()
column_to_impute = st.selectbox("Column to Impute", ["All Columns"] + columns_with_missing)
method = st.selectbox("Imputation Method", [
"KNN Imputation",
"Median Fill",
"Mean Fill",
"Drop Missing",
"Constant Value Fill"
])
if method == "KNN Imputation":
knn_neighbors = st.slider("KNN Neighbors", 2, 10, 5)
elif method == "Constant Value Fill":
constant_value = st.text_input("Constant Value")
elif clean_action == "Clean Text":
text_column = st.selectbox("Text Column", df.select_dtypes(include='object').columns)
cleaning_operation = st.selectbox("Cleaning Operation", ["Remove Special Characters", "Lowercase", "Uppercase", "Remove Extra Spaces"])
if cleaning_operation == "Remove Special Characters":
chars_to_remove = st.text_input("Characters to Remove", r'[^a-zA-Z0-9\s]')
with col2:
if st.button("Apply Transformation"):
with st.spinner("Applying changes..."):
current_df = df.copy()
# ... (your data history logic) ...
if clean_action == "Handle Missing Values":
if method == "KNN Imputation":
imputer = KNNImputer(n_neighbors=knn_neighbors)
if column_to_impute == "All Columns":
current_df = pd.DataFrame(imputer.fit_transform(current_df), columns=current_df.columns)
else:
current_df[[column_to_impute]] = pd.DataFrame(imputer.fit_transform(current_df[[column_to_impute]]), columns=[column_to_impute])
elif method == "Median Fill":
if column_to_impute == "All Columns":
current_df = current_df.fillna(current_df.median())
else:
current_df[column_to_impute] = current_df[column_to_impute].fillna(current_df[column_to_impute].median())
elif method == "Mean Fill":
if column_to_impute == "All Columns":
current_df = current_df.fillna(current_df.mean())
else:
current_df[column_to_impute] = current_df[column_to_impute].fillna(current_df[column_to_impute].mean())
elif method == "Constant Value Fill":
if column_to_impute == "All Columns":
current_df = current_df.fillna(constant_value)
else:
current_df[column_to_impute] = current_df[column_to_impute].fillna(constant_value)
else:
current_df = current_df.dropna()
elif clean_action == "Clean Text":
import re #moved here since its only used here to avoid library bloat
def clean_text(text, operation, chars_to_remove=r'[^a-zA-Z0-9\s]'):
if operation == "Remove Special Characters":
text = re.sub(chars_to_remove, '', str(text))
elif operation == "Lowercase":
text = str(text).lower()
elif operation == "Uppercase":
text = str(text).upper()
elif operation == "Remove Extra Spaces":
text = " ".join(str(text).split())
return text
current_df[text_column] = current_df[text_column].astype(str).apply(lambda x: clean_text(x, cleaning_operation, chars_to_remove))
st.session_state.cleaned_data = current_df
st.success("Transformation applied!")
elif app_mode == "Advanced EDA":
st.title("🔍 Advanced Exploratory Analysis")
if st.session_state.cleaned_data is not None:
df = st.session_state.cleaned_data.copy()
# Initialize session state for plot configuration
if 'plot_config' not in st.session_state:
st.session_state.plot_config = {
'plot_type': "Histogram",
'x_col': df.columns[0] if len(df.columns) > 0 else None,
'y_col': df.columns[1] if len(df.columns) > 1 else None,
'z_col': df.columns[2] if len(df.columns) > 2 else None,
'color_col': None,
'size_col': None,
'time_col': None,
'value_col': None,
'scatter_matrix_cols': df.select_dtypes(include=np.number).columns.tolist()[:5],
'color_palette': "#00f7ff",
'color_continuous_scale': "Viridis",
'hover_data_cols': [],
'filter_col': None,
'filter_options': []
}
# Data Filtering Section
with st.expander("🔎 Data Filtering", expanded=False):
# Use direct session state assignment for reactivity
st.session_state.plot_config['filter_col'] = st.selectbox(
"Filter Column",
[None] + list(df.columns),
help="Choose a column to filter the data."
)
if st.session_state.plot_config['filter_col']:
unique_values = df[st.session_state.plot_config['filter_col']].unique()
st.session_state.plot_config['filter_options'] = st.multiselect(
"Filter Values",
unique_values,
default=unique_values,
help=f"Select values from '{st.session_state.plot_config['filter_col']}'"
)
df = df[df[st.session_state.plot_config['filter_col']].isin(
st.session_state.plot_config['filter_options']
)]
# Visualization Configuration
st.sidebar.header("📊 Plot Configuration")
# Plot type selector
st.session_state.plot_config['plot_type'] = st.sidebar.selectbox(
"Choose Visualization",
[
"Histogram", "Scatter Plot", "Box Plot",
"Correlation Heatmap", "3D Scatter",
"Violin Plot", "Time Series", "Scatter Matrix"
],
index=0 # Reset to first option when plot type changes
)
# Dynamic controls based on plot type
if st.session_state.plot_config['plot_type'] != "Correlation Heatmap":
st.session_state.plot_config['x_col'] = st.sidebar.selectbox(
"X Axis",
df.columns,
index=df.columns.get_loc(st.session_state.plot_config['x_col'])
if st.session_state.plot_config['x_col'] in df.columns else 0
)
if st.session_state.plot_config['plot_type'] in ["Scatter Plot", "Box Plot",
"Violin Plot", "Time Series",
"3D Scatter", "Histogram"]:
st.session_state.plot_config['y_col'] = st.sidebar.selectbox(
"Y Axis",
df.columns,
index=df.columns.get_loc(st.session_state.plot_config['y_col'])
if st.session_state.plot_config['y_col'] in df.columns else 0
)
if st.session_state.plot_config['plot_type'] == "3D Scatter":
st.session_state.plot_config['z_col'] = st.sidebar.selectbox(
"Z Axis",
df.columns,
index=df.columns.get_loc(st.session_state.plot_config['z_col'])
if st.session_state.plot_config['z_col'] in df.columns else 0
)
st.session_state.plot_config['color_col'] = st.sidebar.selectbox(
"Color by",
[None] + list(df.columns)
)
# Color configuration
if st.session_state.plot_config['plot_type'] == "Correlation Heatmap":
st.session_state.plot_config['color_continuous_scale'] = st.sidebar.selectbox(
"Color Scale",
['Viridis', 'Plasma', 'Magma', 'Cividis', 'RdBu']
)
else:
st.session_state.plot_config['color_palette'] = st.sidebar.selectbox(
"Color Palette",
['#00f7ff', '#ff00ff', '#f70000', '#0000f7']
)
# Additional configurations
if st.session_state.plot_config['plot_type'] == "Scatter Plot":
st.session_state.plot_config['size_col'] = st.sidebar.selectbox(
"Size by",
[None] + list(df.columns)
)
st.session_state.plot_config['hover_data_cols'] = st.sidebar.multiselect(
"Hover Data",
df.columns
)
if st.session_state.plot_config['plot_type'] == "Time Series":
st.session_state.plot_config['time_col'] = st.sidebar.selectbox(
"Time Column",
df.columns
)
st.session_state.plot_config['value_col'] = st.sidebar.selectbox(
"Value Column",
df.columns
)
if st.session_state.plot_config['plot_type'] == "Scatter Matrix":
st.session_state.plot_config['scatter_matrix_cols'] = st.multiselect(
"Columns for Scatter Matrix",
df.select_dtypes(include=np.number).columns,
default=st.session_state.plot_config['scatter_matrix_cols']
)
# Plot generation
try:
fig = None
config = st.session_state.plot_config
if config['plot_type'] == "Histogram":
fig = px.histogram(
df, x=config['x_col'], y=config['y_col'],
nbins=30, template="plotly_dark",
color_discrete_sequence=[config['color_palette']]
)
elif config['plot_type'] == "Scatter Plot":
fig = px.scatter(
df, x=config['x_col'], y=config['y_col'],
color_discrete_sequence=[config['color_palette']],
size=config['size_col'],
hover_data=config['hover_data_cols']
)
elif config['plot_type'] == "3D Scatter":
fig = px.scatter_3d(
df, x=config['x_col'], y=config['y_col'], z=config['z_col'],
color=config['color_col'],
color_discrete_sequence=[config['color_palette']]
)
elif config['plot_type'] == "Correlation Heatmap":
numeric_df = df.select_dtypes(include=np.number)
if not numeric_df.empty:
corr = numeric_df.corr()
fig = px.imshow(
corr, text_auto=True,
color_continuous_scale=config['color_continuous_scale']
)
else:
st.warning("No numerical columns found for correlation heatmap.")
elif config['plot_type'] == "Box Plot":
fig = px.box(
df, x=config['x_col'], y=config['y_col'],
color_discrete_sequence=[config['color_palette']]
)
elif config['plot_type'] == "Violin Plot":
fig = px.violin(
df, x=config['x_col'], y=config['y_col'],
box=True, points="all",
color_discrete_sequence=[config['color_palette']]
)
elif config['plot_type'] == "Time Series":
df = df.sort_values(by=config['time_col'])
fig = px.line(
df, x=config['time_col'], y=config['value_col'],
color_discrete_sequence=[config['color_palette']]
)
elif config['plot_type'] == "Scatter Matrix":
fig = px.scatter_matrix(
df, dimensions=config['scatter_matrix_cols'],
color_discrete_sequence=[config['color_palette']]
)
if fig:
st.plotly_chart(fig, use_container_width=True)
except Exception as e:
st.error(f"An error occurred while generating the plot: {e}")
with st.expander("🧪 Hypothesis Testing"):
test_type = st.selectbox("Select Test Type", ["T-test", "Chi-Squared Test"])
if test_type == "T-test":
col1 = st.selectbox("Column 1 (Numeric)", df.select_dtypes(include=np.number).columns)
col2 = st.selectbox("Column 2 (Categorical)", df.select_dtypes(include='object').columns)
if st.button("Run T-test"):
# Example: Split data by category and perform t-test
try:
groups = df.groupby(col2)[col1].apply(list)
if len(groups) == 2:
t_stat, p_value = stats.ttest_ind(groups.iloc[0], groups.iloc[1])
st.write(f"T-statistic: {t_stat:.4f}")
st.write(f"P-value: {p_value:.4f}")
if p_value < 0.05:
st.write("Reject the null hypothesis.")
else:
st.write("Fail to reject the null hypothesis.")
else:
st.write("Select a categorical column with exactly two categories.")
except Exception as e:
st.error(f"An error occurred during the T-test: {e}")
elif app_mode == "Model Training":
st.title("🚂 Model Training")
feature_selection_method = st.selectbox("Feature Selection Method", ["None", "SelectKBest"])
if model_name == "Random Forest":
param_grid = {
'n_estimators': st.slider("Number of Estimators", 10, 200, 100, help="Number of trees in the forest."),
'max_depth': st.slider("Max Depth", 3, 20, 10, help="Maximum depth of the tree."),
'min_samples_split': st.slider("Minimum Samples Split", 2, 10, 2, help="Minimum samples required to split an internal node"), #New hyperparameter
'min_samples_leaf': st.slider("Minimum Samples Leaf", 1, 10, 1, help="Minimum samples required to be at a leaf node"), #New hyperparameter
}
#Inside the train model button
if st.button("Train Model"):
#Feature Selection
if feature_selection_method == "SelectKBest":
k = st.slider("Number of Features to Select", 1, len(feature_columns), len(feature_columns))
selector = SelectKBest(k=k)
X_train_selected = selector.fit_transform(X_train_processed, y_train)
X_test_selected = selector.transform(X_test_processed)
else:
X_train_selected = X_train_processed
X_test_selected = X_test_processed
# Model Training and Hyperparameter Tuning
if model_name == "Linear Regression":
model = LinearRegression()
elif model_name == "Logistic Regression":
model = LogisticRegression(max_iter=1000)
elif model_name == "Decision Tree":
if problem_type == "Regression":
model = DecisionTreeRegressor()
else:
model = DecisionTreeClassifier()
elif model_name == "Random Forest":
if problem_type == "Regression":
model = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(model, param_grid, cv=3, scoring='neg_mean_squared_error') # Example scoring
grid_search.fit(X_train_selected, y_train)
model = grid_search.best_estimator_
st.write("Best Parameters:", grid_search.best_params_)
else:
model = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(model, param_grid, cv=3, scoring='accuracy')
grid_search.fit(X_train_selected, y_train)
model = grid_search.best_estimator_
st.write("Best Parameters:", grid_search.best_params_)
elif model_name == "Gradient Boosting":
model = GradientBoostingRegressor() if problem_type == "Regression" else GradientBoostingClassifier()
elif model_name == "SVM":
model = SVR() if problem_type == "Regression" else SVC()
# Cross-validation
cv_scores = cross_val_score(model, X_train_selected, y_train, cv=5) #example, adjust cv
st.write(f"Cross-validation scores: {cv_scores}")
st.write(f"Mean cross-validation score: {cv_scores.mean():.4f}")
model.fit(X_train_selected, y_train)
# Model Saving
model_filename = st.text_input("Enter Model Filename (without extension)", "trained_model")
if st.button("Save Model"):
try:
joblib.dump(st.session_state.model, f"{model_filename}.joblib")
st.success(f"Model saved as {model_filename}.joblib")
except Exception as e:
st.error(f"Error saving model: {e}")
# Model loading in a different section
model_file = st.file_uploader("Upload Trained Model", type=["joblib"])
if model_file is not None:
try:
st.session_state.model = joblib.load(model_file)
st.success("Model loaded successfully!")
except Exception as e:
st.error(f"Error loading model: {e}")
#Model Evaluation Section
y_pred = model.predict(X_test_selected)
if problem_type == "Regression":
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
st.write(f"Mean Squared Error: {mse:.4f}")
st.write(f"R-squared: {r2:.4f}")
else:
accuracy = accuracy_score(y_test, y_pred)
st.write(f"Accuracy: {accuracy:.4f}")
elif app_mode == "Predictions":
st.title("🔮 Make Predictions")
if st.session_state.model is not None and st.session_state.cleaned_data is not None:
df = st.session_state.cleaned_data.copy()
# Input data for prediction
st.subheader("Enter Data for Prediction")
input_data = {}
model_columns = st.session_state.model.steps[0][1].transformers_[0][2] + st.session_state.model.steps[0][1].transformers_[1][2]
if not set(model_columns).issubset(set(df.drop(columns=[st.session_state.model.steps[-1][0]]).columns)):
st.error("The model was trained on a dataframe that contains different columns than the currently uploaded dataframe. Please upload the correct dataframe.")
st.stop()
for col in model_columns:
if pd.api.types.is_numeric_dtype(df[col]):
input_data[col] = st.number_input(f"Enter {col}", value=df[col].mean())
else:
input_data[col] = st.selectbox(f"Select {col}", df[col].unique())
# Prediction Button
if st.button("Make Prediction"):
try:
input_df = pd.DataFrame([input_data])
prediction = st.session_state.model.predict(input_df)[0]
st.subheader("Prediction Result")
st.write(f"The predicted value is: {prediction}")
# Additional Feedback (Example for Classification)
if isinstance(st.session_state.model.steps[-1][1], LogisticRegression):
probabilities = st.session_state.model.predict_proba(input_df)[0]
st.write("Predicted Probabilities:")
st.write(probabilities)
except Exception as e:
st.error(f"An error occurred during prediction: {e}")
else:
st.write("Please train a model first in the 'Model Training' section.")
#Add batch prediction section in prediction tab
st.subheader("Batch Predictions")
batch_file = st.file_uploader("Upload CSV for Batch Predictions", type=["csv"])
if batch_file is not None:
try:
batch_df = pd.read_csv(batch_file)
# Preprocess the batch data
batch_processed = st.session_state.preprocessor.transform(batch_df)
# Make predictions
batch_predictions = st.session_state.model.predict(batch_processed)
batch_df['Prediction'] = batch_predictions
st.dataframe(batch_df)
# Download predictions
csv = batch_df.to_csv(index=False)
b64 = base64.b64encode(csv.encode()).decode() # some strings
href = f'<a href="data:file/csv;base64,{b64}" download="predictions.csv">Download Predictions CSV</a>'
st.markdown(href, unsafe_allow_html=True)
except Exception as e:
st.error(f"Error processing batch file: {e}")
elif app_mode == "Visualization Lab":
st.title("🔬 Advanced Data Visualization and Clustering Lab")
# Initialize session state for cleaned data
if 'cleaned_data' not in st.session_state:
st.session_state.cleaned_data = None
# Sample data upload (replace with your data loading logic)
uploaded_file = st.file_uploader("Upload a CSV file", type=["csv"])
if uploaded_file is not None:
try:
df = pd.read_csv(uploaded_file)
st.session_state.cleaned_data = df
st.success("Data loaded successfully!")
except Exception as e:
st.error(f"Error loading data: {e}")
if st.session_state.cleaned_data is not None:
df = st.session_state.cleaned_data.copy()
# Visualization Type Selection
visualization_type = st.selectbox("Select Visualization Type", [
"Pair Plot", "Parallel Coordinates Plot", "Andrews Curves", "Pie Chart",
"Area Chart", "Density Contour", "Sunburst Chart", "Funnel Chart", "Clustering Analysis"
])
if visualization_type == "Pair Plot":
st.subheader("Pair Plot")
cols_for_pairplot = st.multiselect("Select Columns for Pair Plot", df.select_dtypes(include=np.number).columns.tolist(), default=df.select_dtypes(include=np.number).columns.tolist()[:3])
if cols_for_pairplot:
fig = px.scatter_matrix(df, dimensions=cols_for_pairplot)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Parallel Coordinates Plot":
st.subheader("Parallel Coordinates Plot")
cols_for_parallel = st.multiselect("Select Columns for Parallel Coordinates", df.select_dtypes(include=np.number).columns.tolist(), default=df.select_dtypes(include=np.number).columns.tolist()[:5])
if cols_for_parallel:
fig = px.parallel_coordinates(df[cols_for_parallel], color=df[cols_for_parallel[0]] if cols_for_parallel else None)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Andrews Curves":
st.subheader("Andrews Curves")
cols_for_andrews = st.multiselect("Select Columns for Andrews Curves", df.select_dtypes(include=np.number).columns.tolist(), default=df.select_dtypes(include=np.number).columns.tolist()[:5])
if cols_for_andrews:
fig = px.andrews_curves(df[cols_for_andrews + [df.columns[0]]], class_column=df.columns[0])
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Pie Chart":
st.subheader("Pie Chart")
col_for_pie = st.selectbox("Select Column for Pie Chart", df.columns)
fig = px.pie(df, names=col_for_pie)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Area Chart":
st.subheader("Area Chart")
cols_for_area = st.multiselect("Select Columns for Area Chart", df.select_dtypes(include=np.number).columns.tolist(), default=df.select_dtypes(include=np.number).columns.tolist()[:3])
if cols_for_area:
fig = px.area(df[cols_for_area])
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Density Contour":
st.subheader("Density Contour")
x_col = st.selectbox("Select X Column for Density Contour", df.select_dtypes(include=np.number).columns.tolist())
y_col = st.selectbox("Select Y Column for Density Contour", df.select_dtypes(include=np.number).columns.tolist())
fig = px.density_contour(df, x=x_col, y=y_col)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Sunburst Chart":
st.subheader("Sunburst Chart")
path_cols = st.multiselect("Select Path Columns for Sunburst Chart", df.columns)
if path_cols:
fig = px.sunburst(df, path=path_cols)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Funnel Chart":
st.subheader("Funnel Chart")
x_col = st.selectbox("Select X Column for Funnel Chart (Values)", df.select_dtypes(include=np.number).columns.tolist())
y_col = st.selectbox("Select Y Column for Funnel Chart (Categories)", df.columns)
fig = px.funnel(df, x=x_col, y=y_col)
st.plotly_chart(fig, use_container_width=True)
elif visualization_type == "Clustering Analysis":
st.subheader("Clustering Analysis")
numerical_cols = df.select_dtypes(include=np.number).columns.tolist()
if not numerical_cols:
st.warning("No numerical columns found for clustering.")
else:
cluster_cols = st.multiselect("Select Columns for Clustering", numerical_cols, default=numerical_cols[:2] if len(numerical_cols) >= 2 else numerical_cols)
if cluster_cols:
try:
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df[cluster_cols])
n_clusters = st.slider("Number of Clusters", 2, 10, 3, help="Number of clusters to form.")
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(scaled_data)
df['Cluster'] = clusters
if len(cluster_cols) == 2:
fig = px.scatter(df, x=cluster_cols[0], y=cluster_cols[1], color='Cluster', title="K-Means Clustering")
st.plotly_chart(fig, use_container_width=True)
elif len(cluster_cols) == 3:
fig = px.scatter_3d(df, x=cluster_cols[0], y=cluster_cols[1], z=cluster_cols[2], color='Cluster', title="K-Means Clustering (3D)")
st.plotly_chart(fig, use_container_width=True)
else:
st.write("Clustering visualization is only supported for 2 or 3 selected columns.")
st.success("Clustering applied successfully!")
except Exception as e:
st.error(f"An error occurred during clustering: {e}")
#Add clustering performance in clustering analysis
if len(cluster_cols) >= 2: # Evaluate Silhouette Score
try:
silhouette_avg = silhouette_score(scaled_data, clusters)
st.write(f"Silhouette Score: {silhouette_avg:.4f}")
except:
st.write("Could not compute silhouette score")
#Add dimensionality reduction option and 2d/3d plots
dimension_reduction = st.selectbox("Dimensionality Reduction", ["None", "PCA"])
if dimension_reduction == "PCA":
n_components = st.slider("Number of Components", 2, min(3, len(cluster_cols)), 2)
pca = PCA(n_components=n_components)
principal_components = pca.fit_transform(scaled_data)
pca_df = pd.DataFrame(data=principal_components, columns=[f'PC{i + 1}' for i in range(n_components)])
pca_df['Cluster'] = clusters # Add Cluster
if len(cluster_cols) >= 2: #plotting section
fig = None #Initialize fig
if dimension_reduction == "None":
if len(cluster_cols) == 2:
fig = px.scatter(df, x=cluster_cols[0], y=cluster_cols[1], color='Cluster', title="K-Means Clustering")
st.plotly_chart(fig, use_container_width=True)
elif len(cluster_cols) == 3:
fig = px.scatter_3d(df, x=cluster_cols[0], y=cluster_cols[1], z=cluster_cols[2], color='Cluster', title="K-Means Clustering (3D)")
st.plotly_chart(fig, use_container_width=True)
else:
st.write("Clustering visualization is only supported for 2 or 3 selected columns.")
elif dimension_reduction == "PCA":
if n_components == 2:
fig = px.scatter(pca_df, x='PC1', y='PC2', color='Cluster', title="K-Means Clustering (PCA - 2D)")
st.plotly_chart(fig, use_container_width=True)
elif n_components == 3:
fig = px.scatter_3d(pca_df, x='PC1', y='PC2', z='PC3', color='Cluster', title="K-Means Clustering (PCA - 3D)")
st.plotly_chart(fig, use_container_width=True)
else:
st.write("PCA visualization is only supported for 2 or 3 components.")
elif app_mode == "Neural Network Studio":
st.title("🧠 Neural Network Studio")
if st.session_state.cleaned_data is not None:
df = st.session_state.cleaned_data.copy()
# Target Variable Selection
target_column = st.selectbox("Select Target Variable", df.columns, help="Choose the column you want to predict.")
# Problem Type Selection
problem_type = st.radio("Select Problem Type", ["Regression", "Classification"], help="Choose the type of machine learning problem.")
# Feature Selection (optional)
use_all_features = st.checkbox("Use All Features", value=True, help="Select to use all features for training. Deselect to manually choose features.")
if use_all_features:
feature_columns = df.drop(columns=[target_column]).columns.tolist()
else:
feature_columns = st.multiselect("Select Feature Columns", df.drop(columns=[target_column]).columns, help="Choose the features you want to use for prediction.")
# Model Selection
model_type = st.selectbox("Select Neural Network Model", [
"Simple Neural Network", "Convolutional Neural Network (CNN)", "Recurrent Neural Network (RNN)"
], help="Choose the neural network model to use.")
# Hyperparameter Tuning
with st.expander("Hyperparameter Tuning", expanded=False):
if model_type == "Simple Neural Network":
hidden_layers = st.slider("Number of Hidden Layers", 1, 5, 2, help="Number of hidden layers in the network.")
neurons_per_layer = st.slider("Neurons per Layer", 10, 200, 50, help="Number of neurons in each hidden layer.")
epochs = st.slider("Epochs", 10, 200, 50, help="Number of epochs for training.")
batch_size = st.slider("Batch Size", 16, 128, 32, help="Batch size for training.")
elif model_type == "Convolutional Neural Network (CNN)":
epochs_cnn = st.slider("Epochs", 10, 200, 50, help="Number of epochs for CNN training.")
batch_size_cnn = st.slider("Batch Size", 16, 128, 32, help="Batch size for CNN training.")
elif model_type == "Recurrent Neural Network (RNN)":
epochs_rnn = st.slider("Epochs", 10, 200, 50, help="Number of epochs for RNN training.")
batch_size_rnn = st.slider("Batch Size", 16, 128, 32, help="Batch size for RNN training.")
sequence_length = st.slider("Sequence Length (for RNN)", 10, 100, 30, help="Length of the input sequences for RNN.")
# Train-Test Split
test_size = st.slider("Test Size", 0.1, 0.5, 0.2, help="Proportion of the data to use for testing.")
# Model Training Button
if st.button("Train Neural Network Model"):
with st.spinner("Training neural network model..."):
try:
# Split data
X = df[feature_columns]
y = df[target_column]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
# Preprocessing
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
numeric_features = X_train.select_dtypes(include=np.number).columns
categorical_features = X_train.select_dtypes(include='object').columns
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
# Neural Network Model Selection and Training
tf.random.set_seed(42) # for reproducibility
# Callbacks (Early Stopping)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
if model_type == "Simple Neural Network":
model = keras.Sequential()
model.add(layers.Input(shape=(X_train_processed.shape[1],)))
for _ in range(hidden_layers):
model.add(layers.Dense(neurons_per_layer, activation=activation)) # Use the selected activation
model.add(
layers.Dense(1 if problem_type == "Regression" else len(np.unique(y_train)),
activation='linear' if problem_type == "Regression" else 'softmax'))
optimizer = keras.optimizers.Adam(learning_rate=learning_rate) # Use the learning rate
model.compile(optimizer=optimizer,
loss='mse' if problem_type == "Regression" else 'sparse_categorical_crossentropy',
metrics=['mae'] if problem_type == "Regression" else ['accuracy'])
history = model.fit(X_train_processed, y_train, epochs=epochs, batch_size=batch_size,
validation_split=0.2, verbose=0,
callbacks=[early_stopping]) # Added early stopping
y_pred = model.predict(X_test_processed)
if problem_type == "Classification":
y_pred = np.argmax(y_pred, axis=1)
elif model_type == "Convolutional Neural Network (CNN)":
X_train_cnn = np.expand_dims(X_train_processed, axis=2)
X_test_cnn = np.expand_dims(X_test_processed, axis=2)
model = keras.Sequential()
model.add(layers.Conv1D(filters=filters, kernel_size=kernel_size, activation='relu',
input_shape=(X_train_cnn.shape[1], 1)))
model.add(layers.MaxPooling1D(pool_size=pooling_size))
model.add(layers.Flatten())
model.add(layers.Dense(50, activation='relu'))
model.add(
layers.Dense(1 if problem_type == "Regression" else len(np.unique(y_train)),
activation='linear' if problem_type == "Regression" else 'softmax'))
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='mse' if problem_type == "Regression" else 'sparse_categorical_crossentropy',
metrics=['mae'] if problem_type == "Regression" else ['accuracy'])
history = model.fit(X_train_cnn, y_train, epochs=epochs_cnn, batch_size=batch_size_cnn,
validation_split=0.2, verbose=0,
callbacks=[early_stopping])
y_pred = model.predict(X_test_cnn)
if problem_type == "Classification":
y_pred = np.argmax(y_pred, axis=1)
elif model_type == "Recurrent Neural Network (RNN)":
try:
X_train_rnn = np.reshape(X_train_processed, (
X_train_processed.shape[0], sequence_length,
X_train_processed.shape[1] // sequence_length))
X_test_rnn = np.reshape(X_test_processed, (
X_test_processed.shape[0], sequence_length, X_test_processed.shape[1] // sequence_length))
model = keras.Sequential()
model.add(layers.SimpleRNN(units, activation='relu', # Use the selected units
dropout=dropout_rate,
input_shape=(X_train_rnn.shape[1], X_train_rnn.shape[2])))
model.add(
layers.Dense(1 if problem_type == "Regression" else len(np.unique(y_train)),
activation='linear' if problem_type == "Regression" else 'softmax'))
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer,
loss='mse' if problem_type == "Regression" else 'sparse_categorical_crossentropy',
metrics=['mae'] if problem_type == "Regression" else ['accuracy'])
history = model.fit(X_train_rnn, y_train, epochs=epochs_rnn, batch_size=batch_size_rnn,
validation_split=0.2, verbose=0,
callbacks=[early_stopping])
y_pred = model.predict(X_test_rnn)
if problem_type == "Classification":
y_pred = np.argmax(y_pred, axis=1)
except Exception as e:
st.error(f"Error during RNN training: {e}")
st.stop() # Stop execution if RNN fails
# Evaluation
if problem_type == "Regression":
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
st.write(f"Mean Squared Error: {mse:.4f}")
st.write(f"Root Mean Squared Error: {rmse:.4f}")
st.write(f"Mean Absolute Error: {mae:.4f}")
st.write(f"R-squared: {r2:.4f}")
else:
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)
st.write(f"Accuracy: {accuracy:.4f}")
st.write(f"Precision: {precision:.4f}")
st.write(f"Recall: {recall:.4f}")
st.write(f"F1 Score: {f1:.4f}")
st.write("Classification Report:")
st.text(classification_report(y_test, y_pred))
# Visualization
st.subheader("Training History")
fig, ax = plt.subplots() # Use matplotlib directly
ax.plot(history.history['loss'], label='loss')
ax.plot(history.history['val_loss'], label='val_loss')
ax.set_xlabel('Epoch')
ax.set_ylabel('Loss')
ax.legend()
st.pyplot(fig) # Display with st.pyplot
st.success("Model trained successfully!")
except Exception as e:
st.error(f"An error occurred during training: {e}")
except Exception as e:
st.error(f"An error occurred during training: {e}")