Spaces:
Build error
Build error
| import os | |
| # Disable OpenMP | |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| os.environ['OPENBLAS_NUM_THREADS'] = '1' | |
| os.environ['MKL_NUM_THREADS'] = '1' | |
| os.environ['VECLIB_MAXIMUM_THREADS'] = '1' | |
| os.environ['NUMEXPR_NUM_THREADS'] = '1' | |
| import streamlit as st | |
| import torch | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import shap | |
| from sklearn.preprocessing import MinMaxScaler | |
| import plotly.graph_objects as go | |
| import io | |
| from matplotlib.figure import Figure | |
| import math | |
| import torch.nn.functional as F | |
| # Set page config | |
| st.set_page_config( | |
| page_title="Waste Properties Predictor", | |
| page_icon="🔄", | |
| layout="wide" | |
| ) | |
| # Custom CSS to improve the app's appearance | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .main { | |
| padding: 2rem; | |
| } | |
| .stButton>button { | |
| width: 100%; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Load the trained model and recreate the architecture for both friction and cohesion | |
| class DualStreamNet(torch.nn.Module): | |
| def __init__(self, input_size): | |
| super(DualStreamNet, self).__init__() | |
| # Stream 1: Original MLP | |
| self.mlp_fc1 = torch.nn.Linear(input_size, 64) | |
| self.mlp_fc2 = torch.nn.Linear(64, 1000) | |
| self.mlp_fc3 = torch.nn.Linear(1000, 200) | |
| self.mlp_fc4 = torch.nn.Linear(200, 8) | |
| # Stream 2: Feature Attention Mechanism | |
| self.feature_attention_dim = 16 | |
| self.feature_projection = torch.nn.Linear(input_size, self.feature_attention_dim) | |
| self.feature_query = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
| self.feature_key = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
| self.feature_value = torch.nn.Linear(self.feature_attention_dim, self.feature_attention_dim) | |
| self.feature_norm = torch.nn.LayerNorm(self.feature_attention_dim) | |
| # Stream 3: Batch Attention Mechanism | |
| self.batch_attention_dim = 16 | |
| self.batch_projection = torch.nn.Linear(input_size, self.batch_attention_dim) | |
| self.batch_query = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
| self.batch_key = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
| self.batch_value = torch.nn.Linear(self.batch_attention_dim, self.batch_attention_dim) | |
| self.batch_norm = torch.nn.LayerNorm(self.batch_attention_dim) | |
| # Feature Attention stream MLP | |
| self.feature_att_fc1 = torch.nn.Linear(self.feature_attention_dim, 32) | |
| self.feature_att_fc2 = torch.nn.Linear(32, 8) | |
| # Batch Attention stream MLP | |
| self.batch_att_fc1 = torch.nn.Linear(self.batch_attention_dim, 32) | |
| self.batch_att_fc2 = torch.nn.Linear(32, 8) | |
| # Concatenated output | |
| self.final_fc = torch.nn.Linear(24, 1) # 8 from MLP + 8 from feature attention + 8 from batch attention | |
| self.dropout = torch.nn.Dropout(0.2) | |
| # Initialize weights | |
| self.apply(self._init_weights) | |
| def _init_weights(self, module): | |
| if isinstance(module, torch.nn.Linear): | |
| torch.nn.init.xavier_uniform_(module.weight) | |
| if module.bias is not None: | |
| module.bias.data.zero_() | |
| def feature_attention(self, x): | |
| # Project input to attention dimension | |
| projected = self.feature_projection(x) | |
| # Self-attention mechanism across features | |
| query = self.feature_query(projected) | |
| key = self.feature_key(projected) | |
| value = self.feature_value(projected) | |
| # Calculate attention scores | |
| scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.feature_attention_dim) | |
| attention_weights = F.softmax(scores, dim=-1) | |
| # Apply attention weights | |
| context = torch.matmul(attention_weights, value) | |
| # Add residual connection and normalize | |
| context = context + projected | |
| context = self.feature_norm(context) | |
| return context | |
| def batch_attention(self, x): | |
| batch_size = x.size(0) | |
| # If batch size is 1, we can't do batch attention | |
| if batch_size <= 1: | |
| return self.feature_projection(x) | |
| # Project input to attention dimension | |
| projected = self.batch_projection(x) | |
| # Self-attention mechanism across batch dimension | |
| query = self.batch_query(projected) | |
| key = self.batch_key(projected) | |
| value = self.batch_value(projected) | |
| # Calculate attention scores across batch dimension | |
| # Reshape tensors for batch-wise attention | |
| query_reshaped = query.view(batch_size, -1) # (batch_size, feature_dim) | |
| key_reshaped = key.view(batch_size, -1) # (batch_size, feature_dim) | |
| # Compute similarity between samples in the batch | |
| scores = torch.mm(query_reshaped, key_reshaped.t()) / math.sqrt(key_reshaped.size(1)) | |
| attention_weights = F.softmax(scores, dim=1) # (batch_size, batch_size) | |
| # Weighted sum of values across batch dimension | |
| batch_context = torch.mm(attention_weights, value.view(batch_size, -1)) | |
| batch_context = batch_context.view(batch_size, -1) # Reshape back | |
| # Add residual connection and normalize | |
| context = batch_context.view_as(projected) + projected | |
| context = self.batch_norm(context) | |
| return context | |
| def forward(self, x): | |
| # Stream 1: Original MLP | |
| mlp_x = F.relu(self.mlp_fc1(x)) | |
| mlp_x = self.dropout(mlp_x) | |
| mlp_x = F.relu(self.mlp_fc2(mlp_x)) | |
| mlp_x = self.dropout(mlp_x) | |
| mlp_x = F.relu(self.mlp_fc3(mlp_x)) | |
| mlp_x = self.dropout(mlp_x) | |
| mlp_x = F.relu(self.mlp_fc4(mlp_x)) | |
| mlp_x = self.dropout(mlp_x) | |
| # Stream 2: Feature Attention mechanism | |
| feature_att_x = self.feature_attention(x) | |
| feature_att_x = F.relu(self.feature_att_fc1(feature_att_x)) | |
| feature_att_x = self.dropout(feature_att_x) | |
| feature_att_x = F.relu(self.feature_att_fc2(feature_att_x)) | |
| feature_att_x = self.dropout(feature_att_x) | |
| # Stream 3: Batch Attention mechanism | |
| batch_att_x = self.batch_attention(x) | |
| batch_att_x = F.relu(self.batch_att_fc1(batch_att_x)) | |
| batch_att_x = self.dropout(batch_att_x) | |
| batch_att_x = F.relu(self.batch_att_fc2(batch_att_x)) | |
| batch_att_x = self.dropout(batch_att_x) | |
| # Concatenate outputs from all three streams | |
| combined = torch.cat([mlp_x, feature_att_x, batch_att_x], dim=1) | |
| # Final prediction | |
| output = self.final_fc(combined) | |
| return output | |
| def load_model_and_data(): | |
| # Set device and random seeds | |
| np.random.seed(32) | |
| torch.manual_seed(42) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Load data | |
| data = pd.read_excel("Data_syw_r.xlsx") # Updated to use Data_syw_r.xlsx | |
| X = data.iloc[:, list(range(1, 17)) + list(range(21, 23))] | |
| # Friction data | |
| y_friction = data.iloc[:, 28].values | |
| correlation_with_friction = abs(X.corrwith(pd.Series(y_friction))) | |
| selected_features_friction = correlation_with_friction[correlation_with_friction > 0.1].index | |
| X_friction = X[selected_features_friction] | |
| # Cohesion data | |
| y_cohesion = data.iloc[:, 25].values | |
| correlation_with_cohesion = abs(X.corrwith(pd.Series(y_cohesion))) | |
| selected_features_cohesion = correlation_with_cohesion[correlation_with_cohesion > 0.1].index | |
| X_cohesion = X[selected_features_cohesion] | |
| # Initialize and fit scalers for friction | |
| scaler_X_friction = MinMaxScaler() | |
| scaler_y_friction = MinMaxScaler() | |
| scaler_X_friction.fit(X_friction) | |
| scaler_y_friction.fit(y_friction.reshape(-1, 1)) | |
| # Initialize and fit scalers for cohesion | |
| scaler_X_cohesion = MinMaxScaler() | |
| scaler_y_cohesion = MinMaxScaler() | |
| scaler_X_cohesion.fit(X_cohesion) | |
| scaler_y_cohesion.fit(y_cohesion.reshape(-1, 1)) | |
| # Load models | |
| friction_model = DualStreamNet(input_size=len(selected_features_friction)).to(device) | |
| friction_model.load_state_dict(torch.load('best_friction_model.pt')) | |
| friction_model.eval() | |
| cohesion_model = DualStreamNet(input_size=len(selected_features_cohesion)).to(device) | |
| cohesion_model.load_state_dict(torch.load('cohebest.pt')) | |
| cohesion_model.eval() | |
| return (friction_model, X_friction.columns, scaler_X_friction, scaler_y_friction, | |
| cohesion_model, X_cohesion.columns, scaler_X_cohesion, scaler_y_cohesion, | |
| device, X_friction, X_cohesion) | |
| def predict_friction(input_values, model, scaler_X, scaler_y, device): | |
| # Scale input values | |
| input_scaled = scaler_X.transform(input_values) | |
| input_tensor = torch.FloatTensor(input_scaled).to(device) | |
| # Make prediction | |
| with torch.no_grad(): | |
| prediction_scaled = model(input_tensor) | |
| prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) | |
| return prediction[0][0] | |
| def predict_cohesion(input_values, model, scaler_X, scaler_y, device): | |
| # Scale input values | |
| input_scaled = scaler_X.transform(input_values) | |
| input_tensor = torch.FloatTensor(input_scaled).to(device) | |
| # Make prediction | |
| with torch.no_grad(): | |
| prediction_scaled = model(input_tensor) | |
| prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) | |
| return prediction[0][0] | |
| def calculate_shap_values(input_values, model, X, scaler_X, scaler_y, device): | |
| def model_predict(X): | |
| X_scaled = scaler_X.transform(X) | |
| X_tensor = torch.FloatTensor(X_scaled).to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| scaled_predictions = model(X_tensor).cpu().numpy().flatten() | |
| # Unscale the predictions | |
| return scaler_y.inverse_transform(scaled_predictions.reshape(-1, 1)).flatten() | |
| try: | |
| # Set random seed for reproducibility | |
| np.random.seed(42) | |
| # Use k-means for background data | |
| background = shap.kmeans(X.values, 10) | |
| explainer = shap.KernelExplainer(model_predict, background) | |
| # Calculate SHAP values with more samples for stability | |
| shap_values = explainer.shap_values(input_values.values, nsamples=200) | |
| if isinstance(shap_values, list): | |
| shap_values = np.array(shap_values[0]) | |
| # Unscale the expected value | |
| expected_value = explainer.expected_value | |
| if isinstance(expected_value, np.ndarray): | |
| expected_value = expected_value[0] | |
| return shap_values[0], expected_value | |
| except Exception as e: | |
| st.error(f"Error calculating SHAP values: {str(e)}") | |
| return np.zeros(len(input_values.columns)), 0.0 | |
| def create_background_data(X, n_samples=50): | |
| """Create and cache background data for SHAP calculations""" | |
| np.random.seed(42) | |
| # Ensure n_samples is not larger than dataset | |
| n_samples = min(n_samples, len(X)) | |
| background_indices = np.random.choice(len(X), size=n_samples, replace=False) | |
| return X.iloc[background_indices].values | |
| def create_waterfall_plot(shap_values, feature_names, base_value, input_data, title): | |
| # Create SHAP explanation object | |
| explanation = shap.Explanation( | |
| values=shap_values, | |
| base_values=base_value, | |
| data=input_data, | |
| feature_names=list(feature_names) | |
| ) | |
| # Create figure | |
| fig = plt.figure(figsize=(12, 8)) | |
| shap.plots.waterfall(explanation, show=False) | |
| plt.title(f'{title} - Local SHAP Value Contributions') | |
| plt.tight_layout() | |
| # Save plot to a buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', bbox_inches='tight', dpi=300) | |
| plt.close(fig) | |
| buf.seek(0) | |
| return buf | |
| def main(): | |
| st.title("🔄 Waste Properties Predictor") | |
| st.write("This app predicts both friction angle and cohesion based on waste composition and characteristics.") | |
| try: | |
| # Load models and data | |
| (friction_model, friction_features, scaler_X_friction, scaler_y_friction, | |
| cohesion_model, cohesion_features, scaler_X_cohesion, scaler_y_cohesion, | |
| device, X_friction, X_cohesion) = load_model_and_data() | |
| # Create and cache background data for SHAP calculations | |
| friction_background = create_background_data(X_friction) | |
| cohesion_background = create_background_data(X_cohesion) | |
| # Combine all unique features | |
| all_features = sorted(list(set(friction_features) | set(cohesion_features))) | |
| st.header("Input Parameters") | |
| # Add file upload option | |
| uploaded_file = st.file_uploader("Upload Excel file with input values", type=['xlsx', 'xls']) | |
| # Initialize input values from the data file | |
| input_values = {} | |
| # Load default values from Data_syw_r.xlsx | |
| default_data = pd.read_excel("Data_syw_r.xlsx") | |
| if len(default_data) > 0: | |
| for feature in all_features: | |
| if feature in default_data.columns: | |
| input_values[feature] = float(default_data[feature].iloc[1]) | |
| # Override with uploaded file if provided | |
| if uploaded_file is not None: | |
| try: | |
| # Read the uploaded file | |
| df = pd.read_excel(uploaded_file) | |
| if len(df) > 0: | |
| # Use the first row of the uploaded file | |
| for feature in all_features: | |
| if feature in df.columns: | |
| input_values[feature] = float(df[feature].iloc[1]) | |
| except Exception as e: | |
| st.error(f"Error reading file: {str(e)}") | |
| st.write("Enter the waste composition and characteristics below to predict both friction angle and cohesion.") | |
| # Create two columns for input | |
| col1, col2 = st.columns(2) | |
| # Create input fields for each feature | |
| for i, feature in enumerate(all_features): | |
| with col1 if i < len(all_features)//2 else col2: | |
| # Get min and max values considering both friction and cohesion datasets | |
| if feature in X_friction.columns and feature in X_cohesion.columns: | |
| min_val = min(float(X_friction[feature].min()), float(X_cohesion[feature].min())) | |
| max_val = max(float(X_friction[feature].max()), float(X_cohesion[feature].max())) | |
| elif feature in X_friction.columns: | |
| min_val = float(X_friction[feature].min()) | |
| max_val = float(X_friction[feature].max()) | |
| else: | |
| min_val = float(X_cohesion[feature].min()) | |
| max_val = float(X_cohesion[feature].max()) | |
| # Use the value from input_values if available, otherwise use 0 | |
| default_value = input_values.get(feature, 0.0) | |
| input_values[feature] = st.number_input( | |
| f"{feature}", | |
| min_value=min_val, | |
| max_value=max_val, | |
| value=default_value, | |
| format="%.5f", | |
| help=f"Range: {min_val:.5f} to {max_val:.5f}" | |
| ) | |
| # Create DataFrames for both predictions | |
| friction_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in friction_features]], | |
| columns=friction_features) | |
| cohesion_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in cohesion_features]], | |
| columns=cohesion_features) | |
| if st.button("Predict Properties"): | |
| with st.spinner("Calculating predictions and SHAP values..."): | |
| # Make predictions | |
| friction_prediction = predict_friction(friction_input_df, friction_model, scaler_X_friction, scaler_y_friction, device) | |
| cohesion_prediction = predict_cohesion(cohesion_input_df, cohesion_model, scaler_X_cohesion, scaler_y_cohesion, device) | |
| # Set random seed before SHAP calculations | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed(42) | |
| # Calculate SHAP values using cached background data | |
| friction_shap_values, friction_base_value = calculate_shap_values(friction_input_df, friction_model, X_friction, scaler_X_friction, scaler_y_friction, device) | |
| cohesion_shap_values, cohesion_base_value = calculate_shap_values(cohesion_input_df, cohesion_model, X_cohesion, scaler_X_cohesion, scaler_y_cohesion, device) | |
| # Display results | |
| st.header("Prediction Results") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Friction Angle", f"{friction_prediction:.5f}°") | |
| with col2: | |
| st.metric("Cohesion", f"{cohesion_prediction:.5f} kPa") | |
| # Create and display waterfall plots | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Friction Angle SHAP Analysis") | |
| friction_waterfall_plot = create_waterfall_plot( | |
| shap_values=friction_shap_values, | |
| feature_names=friction_features, | |
| base_value=friction_base_value, | |
| input_data=friction_input_df.values[0], | |
| title="Friction Angle" | |
| ) | |
| st.image(friction_waterfall_plot) | |
| with col2: | |
| st.subheader("Cohesion SHAP Analysis") | |
| cohesion_waterfall_plot = create_waterfall_plot( | |
| shap_values=cohesion_shap_values, | |
| feature_names=cohesion_features, | |
| base_value=cohesion_base_value, | |
| input_data=cohesion_input_df.values[0], | |
| title="Cohesion" | |
| ) | |
| st.image(cohesion_waterfall_plot) | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| st.info("Please try refreshing the page. If the error persists, contact support.") | |
| if __name__ == "__main__": | |
| main() | |