Spaces:
Sleeping
Sleeping
import os | |
# Disable OpenMP | |
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' | |
os.environ['OMP_NUM_THREADS'] = '1' | |
os.environ['OPENBLAS_NUM_THREADS'] = '1' | |
os.environ['MKL_NUM_THREADS'] = '1' | |
os.environ['VECLIB_MAXIMUM_THREADS'] = '1' | |
os.environ['NUMEXPR_NUM_THREADS'] = '1' | |
import streamlit as st | |
import torch | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import shap | |
from sklearn.preprocessing import MinMaxScaler | |
import plotly.graph_objects as go | |
import io | |
from matplotlib.figure import Figure | |
import math | |
import torch.nn.functional as F | |
# Set page config | |
st.set_page_config( | |
page_title="Waste Properties Predictor", | |
page_icon="🔄", | |
layout="wide" | |
) | |
# Custom CSS to improve the app's appearance | |
st.markdown(""" | |
<style> | |
.stApp { | |
max-width: 1200px; | |
margin: 0 auto; | |
} | |
.main { | |
padding: 2rem; | |
} | |
.stButton>button { | |
width: 100%; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Load the trained model and recreate the architecture for both friction and cohesion | |
class DualStreamNet(torch.nn.Module): | |
def __init__(self, input_size): | |
super(DualStreamNet, self).__init__() | |
# Stream 1: Original MLP | |
self.mlp_fc1 = torch.nn.Linear(input_size, 64) | |
self.mlp_fc2 = torch.nn.Linear(64, 1000) | |
self.mlp_fc3 = torch.nn.Linear(1000, 200) | |
self.mlp_fc4 = torch.nn.Linear(200, 8) | |
# Stream 2: Feature Attention Mechanism | |
self.feature_attention_dim = 16 | |
self.feature_projection = torch.nn.Linear(input_size, self.feature_attention_dim) | |
self.feature_query = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
self.feature_key = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
self.feature_value = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
self.feature_norm = torch.nn.LayerNorm(self.feature_attention_dim) | |
# Stream 3: Batch Attention Mechanism | |
self.batch_attention_dim = 16 | |
self.batch_projection = torch.nn.Linear(input_size, self.batch_attention_dim) | |
self.batch_query = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
self.batch_key = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
self.batch_value = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
self.batch_norm = torch.nn.LayerNorm(self.batch_attention_dim) | |
# Feature Attention stream MLP | |
self.feature_att_fc1 = torch.nn.Linear(self.feature_attention_dim, 32) | |
self.feature_att_fc2 = torch.nn.Linear(32, 8) | |
# Batch Attention stream MLP | |
self.batch_att_fc1 = torch.nn.Linear(self.batch_attention_dim, 32) | |
self.batch_att_fc2 = torch.nn.Linear(32, 8) | |
# Concatenated output | |
self.final_fc = torch.nn.Linear(24, 1) # 8 from MLP + 8 from feature attention + 8 from batch attention | |
self.dropout = torch.nn.Dropout(0.2) | |
# Initialize weights | |
self.apply(self._init_weights) | |
def _init_weights(self, module): | |
if isinstance(module, torch.nn.Linear): | |
torch.nn.init.xavier_uniform_(module.weight) | |
if module.bias is not None: | |
module.bias.data.zero_() | |
def feature_attention(self, x): | |
# Project input to attention dimension | |
projected = self.feature_projection(x) | |
# Self-attention mechanism across features | |
query = self.feature_query(projected) | |
key = self.feature_key(projected) | |
value = self.feature_value(projected) | |
# Calculate attention scores | |
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.feature_attention_dim) | |
attention_weights = F.softmax(scores, dim=-1) | |
# Apply attention weights | |
context = torch.matmul(attention_weights, value) | |
# Add residual connection and normalize | |
context = context + projected | |
context = self.feature_norm(context) | |
return context | |
def batch_attention(self, x): | |
batch_size = x.size(0) | |
# If batch size is 1, we can't do batch attention | |
if batch_size <= 1: | |
return self.feature_projection(x) | |
# Project input to attention dimension | |
projected = self.batch_projection(x) | |
# Self-attention mechanism across batch dimension | |
query = self.batch_query(projected) | |
key = self.batch_key(projected) | |
value = self.batch_value(projected) | |
# Calculate attention scores across batch dimension | |
# Reshape tensors for batch-wise attention | |
query_reshaped = query.view(batch_size, -1) # (batch_size, feature_dim) | |
key_reshaped = key.view(batch_size, -1) # (batch_size, feature_dim) | |
# Compute similarity between samples in the batch | |
scores = torch.mm(query_reshaped, key_reshaped.t()) / math.sqrt(key_reshaped.size(1)) | |
attention_weights = F.softmax(scores, dim=1) # (batch_size, batch_size) | |
# Weighted sum of values across batch dimension | |
batch_context = torch.mm(attention_weights, value.view(batch_size, -1)) | |
batch_context = batch_context.view(batch_size, -1) # Reshape back | |
# Add residual connection and normalize | |
context = batch_context.view_as(projected) + projected | |
context = self.batch_norm(context) | |
return context | |
def forward(self, x): | |
# Stream 1: Original MLP | |
mlp_x = F.relu(self.mlp_fc1(x)) | |
mlp_x = self.dropout(mlp_x) | |
mlp_x = F.relu(self.mlp_fc2(mlp_x)) | |
mlp_x = self.dropout(mlp_x) | |
mlp_x = F.relu(self.mlp_fc3(mlp_x)) | |
mlp_x = self.dropout(mlp_x) | |
mlp_x = F.relu(self.mlp_fc4(mlp_x)) | |
mlp_x = self.dropout(mlp_x) | |
# Stream 2: Feature Attention mechanism | |
feature_att_x = self.feature_attention(x) | |
feature_att_x = F.relu(self.feature_att_fc1(feature_att_x)) | |
feature_att_x = self.dropout(feature_att_x) | |
feature_att_x = F.relu(self.feature_att_fc2(feature_att_x)) | |
feature_att_x = self.dropout(feature_att_x) | |
# Stream 3: Batch Attention mechanism | |
batch_att_x = self.batch_attention(x) | |
batch_att_x = F.relu(self.batch_att_fc1(batch_att_x)) | |
batch_att_x = self.dropout(batch_att_x) | |
batch_att_x = F.relu(self.batch_att_fc2(batch_att_x)) | |
batch_att_x = self.dropout(batch_att_x) | |
# Concatenate outputs from all three streams | |
combined = torch.cat([mlp_x, feature_att_x, batch_att_x], dim=1) | |
# Final prediction | |
output = self.final_fc(combined) | |
return output | |
def load_model_and_data(): | |
# Set device and random seeds | |
np.random.seed(32) | |
torch.manual_seed(42) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Load data | |
data = pd.read_excel("Data_syw_r.xlsx") # Updated to use Data_syw_r.xlsx | |
X = data.iloc[:, list(range(1, 17)) + list(range(21, 23))] | |
# Friction data | |
y_friction = data.iloc[:, 28].values | |
correlation_with_friction = abs(X.corrwith(pd.Series(y_friction))) | |
selected_features_friction = correlation_with_friction[correlation_with_friction > 0.1].index | |
X_friction = X[selected_features_friction] | |
# Cohesion data | |
y_cohesion = data.iloc[:, 25].values | |
correlation_with_cohesion = abs(X.corrwith(pd.Series(y_cohesion))) | |
selected_features_cohesion = correlation_with_cohesion[correlation_with_cohesion > 0.1].index | |
X_cohesion = X[selected_features_cohesion] | |
# Initialize and fit scalers for friction | |
scaler_X_friction = MinMaxScaler() | |
scaler_y_friction = MinMaxScaler() | |
scaler_X_friction.fit(X_friction) | |
scaler_y_friction.fit(y_friction.reshape(-1, 1)) | |
# Initialize and fit scalers for cohesion | |
scaler_X_cohesion = MinMaxScaler() | |
scaler_y_cohesion = MinMaxScaler() | |
scaler_X_cohesion.fit(X_cohesion) | |
scaler_y_cohesion.fit(y_cohesion.reshape(-1, 1)) | |
# Load models | |
friction_model = DualStreamNet(input_size=len(selected_features_friction)).to(device) | |
friction_model.load_state_dict(torch.load('best_friction_model.pt')) | |
friction_model.eval() | |
cohesion_model = DualStreamNet(input_size=len(selected_features_cohesion)).to(device) | |
cohesion_model.load_state_dict(torch.load('cohebest.pt')) | |
cohesion_model.eval() | |
return (friction_model, X_friction.columns, scaler_X_friction, scaler_y_friction, | |
cohesion_model, X_cohesion.columns, scaler_X_cohesion, scaler_y_cohesion, | |
device, X_friction, X_cohesion) | |
def predict_friction(input_values, model, scaler_X, scaler_y, device): | |
# Scale input values | |
input_scaled = scaler_X.transform(input_values) | |
input_tensor = torch.FloatTensor(input_scaled).to(device) | |
# Make prediction | |
with torch.no_grad(): | |
prediction_scaled = model(input_tensor) | |
prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) | |
return prediction[0][0] | |
def predict_cohesion(input_values, model, scaler_X, scaler_y, device): | |
# Scale input values | |
input_scaled = scaler_X.transform(input_values) | |
input_tensor = torch.FloatTensor(input_scaled).to(device) | |
# Make prediction | |
with torch.no_grad(): | |
prediction_scaled = model(input_tensor) | |
prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) | |
return prediction[0][0] | |
def calculate_shap_values(input_values, model, X, scaler_X, scaler_y, device): | |
def model_predict(X): | |
X_scaled = scaler_X.transform(X) | |
X_tensor = torch.FloatTensor(X_scaled).to(device) | |
model.eval() | |
with torch.no_grad(): | |
scaled_predictions = model(X_tensor).cpu().numpy().flatten() | |
# Unscale the predictions | |
return scaler_y.inverse_transform(scaled_predictions.reshape(-1, 1)).flatten() | |
try: | |
# Set random seed for reproducibility | |
np.random.seed(42) | |
# Use k-means for background data | |
background = shap.kmeans(X.values, 10) | |
explainer = shap.KernelExplainer(model_predict, background) | |
# Calculate SHAP values with more samples for stability | |
shap_values = explainer.shap_values(input_values.values, nsamples=200) | |
if isinstance(shap_values, list): | |
shap_values = np.array(shap_values[0]) | |
# Unscale the expected value | |
expected_value = explainer.expected_value | |
if isinstance(expected_value, np.ndarray): | |
expected_value = expected_value[0] | |
return shap_values[0], expected_value | |
except Exception as e: | |
st.error(f"Error calculating SHAP values: {str(e)}") | |
return np.zeros(len(input_values.columns)), 0.0 | |
def create_background_data(X, n_samples=50): | |
"""Create and cache background data for SHAP calculations""" | |
np.random.seed(42) | |
# Ensure n_samples is not larger than dataset | |
n_samples = min(n_samples, len(X)) | |
background_indices = np.random.choice(len(X), size=n_samples, replace=False) | |
return X.iloc[background_indices].values | |
def create_waterfall_plot(shap_values, feature_names, base_value, input_data, title): | |
# Create SHAP explanation object | |
explanation = shap.Explanation( | |
values=shap_values, | |
base_values=base_value, | |
data=input_data, | |
feature_names=list(feature_names) | |
) | |
# Create figure | |
fig = plt.figure(figsize=(12, 8)) | |
shap.plots.waterfall(explanation, show=False) | |
plt.title(f'{title} - Local SHAP Value Contributions') | |
plt.tight_layout() | |
# Save plot to a buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight', dpi=300) | |
plt.close(fig) | |
buf.seek(0) | |
return buf | |
def main(): | |
st.title("🔄 Waste Properties Predictor") | |
st.write("This app predicts both friction angle and cohesion based on waste composition and characteristics.") | |
try: | |
# Load models and data | |
(friction_model, friction_features, scaler_X_friction, scaler_y_friction, | |
cohesion_model, cohesion_features, scaler_X_cohesion, scaler_y_cohesion, | |
device, X_friction, X_cohesion) = load_model_and_data() | |
# Create and cache background data for SHAP calculations | |
friction_background = create_background_data(X_friction) | |
cohesion_background = create_background_data(X_cohesion) | |
# Combine all unique features | |
all_features = sorted(list(set(friction_features) | set(cohesion_features))) | |
st.header("Input Parameters") | |
# Add file upload option | |
uploaded_file = st.file_uploader("Upload Excel file with input values", type=['xlsx', 'xls']) | |
# Initialize input values from the data file | |
input_values = {} | |
# Load default values from Data_syw_r.xlsx | |
default_data = pd.read_excel("Data_syw_r.xlsx") | |
if len(default_data) > 0: | |
for feature in all_features: | |
if feature in default_data.columns: | |
input_values[feature] = float(default_data[feature].iloc[1]) | |
# Override with uploaded file if provided | |
if uploaded_file is not None: | |
try: | |
# Read the uploaded file | |
df = pd.read_excel(uploaded_file) | |
if len(df) > 0: | |
# Use the first row of the uploaded file | |
for feature in all_features: | |
if feature in df.columns: | |
input_values[feature] = float(df[feature].iloc[1]) | |
except Exception as e: | |
st.error(f"Error reading file: {str(e)}") | |
st.write("Enter the waste composition and characteristics below to predict both friction angle and cohesion.") | |
# Create two columns for input | |
col1, col2 = st.columns(2) | |
# Create input fields for each feature | |
for i, feature in enumerate(all_features): | |
with col1 if i < len(all_features)//2 else col2: | |
# Get min and max values considering both friction and cohesion datasets | |
if feature in X_friction.columns and feature in X_cohesion.columns: | |
min_val = min(float(X_friction[feature].min()), float(X_cohesion[feature].min())) | |
max_val = max(float(X_friction[feature].max()), float(X_cohesion[feature].max())) | |
elif feature in X_friction.columns: | |
min_val = float(X_friction[feature].min()) | |
max_val = float(X_friction[feature].max()) | |
else: | |
min_val = float(X_cohesion[feature].min()) | |
max_val = float(X_cohesion[feature].max()) | |
# Use the value from input_values if available, otherwise use 0 | |
default_value = input_values.get(feature, 0.0) | |
input_values[feature] = st.number_input( | |
f"{feature}", | |
min_value=min_val, | |
max_value=max_val, | |
value=default_value, | |
format="%.5f", | |
help=f"Range: {min_val:.5f} to {max_val:.5f}" | |
) | |
# Create DataFrames for both predictions | |
friction_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in friction_features]], | |
columns=friction_features) | |
cohesion_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in cohesion_features]], | |
columns=cohesion_features) | |
if st.button("Predict Properties"): | |
with st.spinner("Calculating predictions and SHAP values..."): | |
# Make predictions | |
friction_prediction = predict_friction(friction_input_df, friction_model, scaler_X_friction, scaler_y_friction, device) | |
cohesion_prediction = predict_cohesion(cohesion_input_df, cohesion_model, scaler_X_cohesion, scaler_y_cohesion, device) | |
# Set random seed before SHAP calculations | |
np.random.seed(42) | |
torch.manual_seed(42) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed(42) | |
# Calculate SHAP values using cached background data | |
friction_shap_values, friction_base_value = calculate_shap_values(friction_input_df, friction_model, X_friction, scaler_X_friction, scaler_y_friction, device) | |
cohesion_shap_values, cohesion_base_value = calculate_shap_values(cohesion_input_df, cohesion_model, X_cohesion, scaler_X_cohesion, scaler_y_cohesion, device) | |
# Display results | |
st.header("Prediction Results") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.metric("Friction Angle", f"{friction_prediction:.5f}°") | |
with col2: | |
st.metric("Cohesion", f"{cohesion_prediction:.5f} kPa") | |
# Create and display waterfall plots | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Friction Angle SHAP Analysis") | |
friction_waterfall_plot = create_waterfall_plot( | |
shap_values=friction_shap_values, | |
feature_names=friction_features, | |
base_value=friction_base_value, | |
input_data=friction_input_df.values[0], | |
title="Friction Angle" | |
) | |
st.image(friction_waterfall_plot) | |
with col2: | |
st.subheader("Cohesion SHAP Analysis") | |
cohesion_waterfall_plot = create_waterfall_plot( | |
shap_values=cohesion_shap_values, | |
feature_names=cohesion_features, | |
base_value=cohesion_base_value, | |
input_data=cohesion_input_df.values[0], | |
title="Cohesion" | |
) | |
st.image(cohesion_waterfall_plot) | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
st.info("Please try refreshing the page. If the error persists, contact support.") | |
if __name__ == "__main__": | |
main() | |