# -*- coding: utf-8 -*- """ Created on Sun Nov 24 12:47:37 2024 @author: Ashmitha """ # -*- coding: utf-8 -*- """ Created on Sun Nov 24 12:25:57 2024 @author: Ashmitha """ # -*- coding: utf-8 -*- """ Created on Sat Nov 9 15:44:40 2024 @author: Ashmitha """ import pandas as pd import numpy as np import gradio as gr from sklearn.metrics import mean_squared_error,r2_score from scipy.stats import pearsonr from sklearn.preprocessing import StandardScaler from sklearn.model_selection import KFold import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import GRU,Dense,Dropout,BatchNormalization,LeakyReLU from tensorflow.keras.optimizers import Adam from tensorflow.keras import regularizers from tensorflow.keras.callbacks import ReduceLROnPlateau,EarlyStopping import os from sklearn.preprocessing import MinMaxScaler from keras.layers import Conv1D,MaxPooling1D,Dense,Flatten,Dropout,LeakyReLU from keras.callbacks import ReduceLROnPlateau,EarlyStopping from sklearn.ensemble import RandomForestRegressor from xgboost import XGBRegressor import io from sklearn.feature_selection import SelectFromModel import tempfile #-------------------------------------Feature selection--------------------------------------------------------------------------------------------- def RandomForestFeatureSelection(trainX, trainy, num_features=60): rf = RandomForestRegressor(n_estimators=1000, random_state=50) rf.fit(trainX, trainy) # Get feature importances importances = rf.feature_importances_ # Select the top N important features indices = np.argsort(importances)[-num_features:] return indices #----------------------------------------------------------GRU Model--------------------------------------------------------------------- import numpy as np from tensorflow.keras.models import Sequential from tensorflow.keras.layers import GRU, Dense, BatchNormalization, Dropout, LeakyReLU from tensorflow.keras.optimizers import Adam from tensorflow.keras import regularizers from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping from sklearn.preprocessing import MinMaxScaler from sklearn.ensemble import RandomForestRegressor from sklearn.feature_selection import SelectFromModel def GRUModel(trainX, trainy, testX, testy, epochs=1000, batch_size=64, learning_rate=0.0001, l1_reg=0.001, l2_reg=0.001, dropout_rate=0.2, feature_selection=True): # Apply feature selection using Random Forest Regressor if feature_selection: # Use RandomForestRegressor to rank features by importance rf = RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(trainX, trainy) # Select features with importance greater than a threshold (e.g., mean importance) selector = SelectFromModel(rf, threshold="mean", prefit=True) trainX = selector.transform(trainX) if testX is not None: testX = selector.transform(testX) print(f"Selected {trainX.shape[1]} features based on feature importance.") # Scale the input data using MinMaxScaler to normalize the feature range scaler = MinMaxScaler() trainX_scaled = scaler.fit_transform(trainX) if testX is not None: testX_scaled = scaler.transform(testX) # Scale the target variable using MinMaxScaler target_scaler = MinMaxScaler() trainy_scaled = target_scaler.fit_transform(trainy.reshape(-1, 1)) # Reshape to 2D for scaler # Reshape trainX and testX to be 3D: (samples, timesteps, features) trainX = trainX_scaled.reshape((trainX.shape[0], 1, trainX.shape[1])) # Adjusted for general feature count if testX is not None: testX = testX_scaled.reshape((testX.shape[0], 1, testX.shape[1])) # Reshape testX if it exists model = Sequential() # GRU Layer model.add(GRU(512, input_shape=(trainX.shape[1], trainX.shape[2]), return_sequences=False, kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) # Dense Layers with Batch Normalization, Dropout, LeakyReLU model.add(Dense(256, kernel_initializer='he_normal', kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(BatchNormalization()) model.add(Dropout(dropout_rate)) model.add(LeakyReLU(alpha=0.1)) model.add(Dense(128, kernel_initializer='he_normal', kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(BatchNormalization()) model.add(Dropout(dropout_rate)) model.add(LeakyReLU(alpha=0.1)) model.add(Dense(64, kernel_initializer='he_normal', kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(BatchNormalization()) model.add(Dropout(dropout_rate)) model.add(LeakyReLU(alpha=0.1)) model.add(Dense(32, kernel_initializer='he_normal', kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(BatchNormalization()) model.add(Dropout(dropout_rate)) model.add(LeakyReLU(alpha=0.1)) # Output Layer with ReLU activation to prevent negative predictions model.add(Dense(1, activation="relu")) # Compile the model model.compile(loss='mse', optimizer=Adam(learning_rate=learning_rate), metrics=['mse']) # Callbacks for learning rate reduction and early stopping learning_rate_reduction = ReduceLROnPlateau(monitor='val_loss', patience=10, verbose=1, factor=0.5, min_lr=1e-6) early_stopping = EarlyStopping(monitor='val_loss', verbose=1, restore_best_weights=True, patience=10) # Train the model history = model.fit(trainX, trainy_scaled, epochs=epochs, batch_size=batch_size, validation_split=0.1, verbose=1, callbacks=[learning_rate_reduction, early_stopping]) # Predict train and test predicted_train = model.predict(trainX) predicted_test = model.predict(testX) if testX is not None else None # Flatten predictions predicted_train = predicted_train.flatten() if predicted_test is not None: predicted_test = predicted_test.flatten() else: predicted_test = np.zeros_like(predicted_train) # Inverse scale the predictions to get them back to original range predicted_train = target_scaler.inverse_transform(predicted_train.reshape(-1, 1)).flatten() if predicted_test is not None: predicted_test = target_scaler.inverse_transform(predicted_test.reshape(-1, 1)).flatten() return predicted_train, predicted_test, history #-----------------------------------------------------------DeepMap------------------------------------------------------------------------------- def CNNModel(trainX, trainy, testX, testy, epochs=1000, batch_size=64, learning_rate=0.0001, l1_reg=0.0001, l2_reg=0.0001, dropout_rate=0.3,feature_selection=True): if feature_selection: rf=RandomForestRegressor(n_estimators=100,random_state=42) rf.fit(trainX,trainy) selector=SelectFromModel(rf, threshold="mean",prefit=True) trainX=selector.transform(trainX) if testX is not None: testX=selector.transform(testX) print(f"Selected {trainX.shape[1]} feature based on the important feature") # Scaling the inputs scaler = MinMaxScaler() trainX_scaled = scaler.fit_transform(trainX) if testX is not None: testX_scaled = scaler.transform(testX) # Reshape for CNN input (samples, features, channels) trainX = trainX_scaled.reshape((trainX.shape[0], trainX.shape[1], 1)) if testX is not None: testX = testX_scaled.reshape((testX.shape[0], testX.shape[1], 1)) model = Sequential() # Convolutional layers model.add(Conv1D(256, kernel_size=3, activation='relu', input_shape=(trainX.shape[1], 1), kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(MaxPooling1D(pool_size=2)) model.add(Dropout(dropout_rate)) model.add(Conv1D(128, kernel_size=3, activation='relu', kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(MaxPooling1D(pool_size=2)) model.add(Dropout(dropout_rate)) # Flatten and Dense layers model.add(Flatten()) model.add(Dense(64, kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))) model.add(LeakyReLU(alpha=0.1)) model.add(Dropout(dropout_rate)) model.add(Dense(1, activation='linear')) # Compile the model model.compile(loss='mse', optimizer=Adam(learning_rate=learning_rate), metrics=['mse']) # Callbacks learning_rate_reduction = ReduceLROnPlateau(monitor='val_loss', patience=5, verbose=1, factor=0.5, min_lr=1e-6) early_stopping = EarlyStopping(monitor='val_loss', verbose=1, restore_best_weights=True, patience=10) # Train the model history = model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, validation_split=0.1, verbose=1, callbacks=[learning_rate_reduction, early_stopping]) predicted_train = model.predict(trainX).flatten() predicted_test = model.predict(testX).flatten() if testX is not None else None return predicted_train, predicted_test, history #-------------------------------------------------------------------------Random Forest---------------------------------------------------- def RFModel(trainX, trainy, testX, testy, n_estimators=100, max_depth=None,feature_selection=True): if feature_selection: rf=RandomForestRegressor(n_estimators=100, random_state=42) rf.fit(trainX, trainy) selector=SelectFromModel(rf, threshold="mean", prefit=True) trainX=selector.transform(trainX) if testX is not None: testX=selector.transform(testX) print(f"Selected {trainX.shape[1]} feature based on the feature selection") # Log transformation of the target variable # Scaling the feature data scaler = MinMaxScaler() trainX_scaled = scaler.fit_transform(trainX) if testX is not None: testX_scaled = scaler.transform(testX) # Define and train the RandomForest model rf_model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42) history=rf_model.fit(trainX_scaled, trainy) # Predictions predicted_train = rf_model.predict(trainX_scaled) predicted_test = rf_model.predict(testX_scaled) if testX is not None else None return predicted_train, predicted_test,history #------------------------------------------------------------------------------XGboost--------------------------------------------------------------- def XGBoostModel(trainX, trainy, testX, testy,learning_rate,min_child_weight,feature_selection=True, n_estimators=100, max_depth=None): if feature_selection: rf=RandomForestRegressor(n_estimators=100,random_state=42) rf.fit(trainX,trainy) selector=SelectFromModel(rf,threshold="mean",prefit=True) trainX=selector.transform(trainX) if testX is not None: testX=selector.transform(testX) print(f"Selected {trainX.shape[1]} features based on feature importance") #trainy_log = np.log1p(trainy) # Log-transform to handle large phenotypic values #if testy is not None: # testy_log = np.log1p(testy) # Scale the features scaler = MinMaxScaler() trainX_scaled = scaler.fit_transform(trainX) if testX is not None: testX_scaled = scaler.transform(testX) # Define and train the XGBoost model # xgb_model = XGBRegressor(n_estimators=n_estimators, max_depth=100, random_state=42) #xgb_model = XGBRegressor(objective ='reg:linear', # n_estimators = 100, seed = 100) xgb_model=XGBRegressor(objective="reg:squarederror",random_state=42) history=xgb_model.fit(trainX, trainy) param_grid={ "learning_rate":0.01, "max_depth" : 10, "n_estimators": 100, "min_child_weight": 5 } # Predictions predicted_train = xgb_model.predict(trainX_scaled) predicted_test = xgb_model.predict(testX_scaled) if testX is not None else None return predicted_train, predicted_test,history #----------------------------------------reading file---------------------------------------------------------------------------------------- # Helper function to read the uploaded CSV file def read_csv_file(uploaded_file): if uploaded_file is not None: if hasattr(uploaded_file, 'data'): # For NamedBytes return pd.read_csv(io.BytesIO(uploaded_file.data)) elif hasattr(uploaded_file, 'name'): # For NamedString return pd.read_csv(uploaded_file.name) return None #-----------------------------------------------------------------calculate topsis score-------------------------------------------------------- def calculate_topsis_score(df): # Normalize the metrics metrics = df[['Train_MSE', 'Train_RMSE', 'Train_R2', 'Train_Corr']].dropna() # Ensure no NaN values norm_metrics = metrics / np.sqrt((metrics ** 2).sum(axis=0)) # Define ideal best and worst for each metric ideal_best = pd.Series(index=norm_metrics.columns) ideal_worst = pd.Series(index=norm_metrics.columns) # For RMSE and MSE (minimization criteria): min is best, max is worst for col in ['Train_MSE', 'Train_RMSE']: ideal_best[col] = norm_metrics[col].min() ideal_worst[col] = norm_metrics[col].max() # For R2 and Corr (maximization criteria): max is best, min is worst for col in ['Train_R2', 'Train_Corr']: ideal_best[col] = norm_metrics[col].max() ideal_worst[col] = norm_metrics[col].min() # Calculate Euclidean distance to ideal best and worst dist_to_best = np.sqrt(((norm_metrics - ideal_best) ** 2).sum(axis=1)) dist_to_worst = np.sqrt(((norm_metrics - ideal_worst) ** 2).sum(axis=1)) # Calculate TOPSIS score topsis_score = dist_to_worst / (dist_to_best + dist_to_worst) df['TOPSIS_Score'] = np.nan # Initialize with NaN df.loc[metrics.index, 'TOPSIS_Score'] = topsis_score # Assign TOPSIS scores return df #--------------------------------------------------- Nested Cross validation--------------------------------------------------------------------------- from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import KFold from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectFromModel from sklearn.metrics import mean_squared_error, r2_score from scipy.stats import pearsonr import numpy as np import pandas as pd def NestedKFoldCrossValidation( training_data, training_additive, testing_data, testing_additive, training_dominance, testing_dominance, epochs, learning_rate, min_child_weight, batch_size=64, outer_n_splits=2, output_file='cross_validation_results.csv', predicted_phenotype_file='predicted_phenotype.csv', feature_selection=True ): if 'phenotypes' not in training_data.columns: raise ValueError("Training data does not contain the 'phenotypes' column.") # Remove Sample ID columns from additive and dominance data training_additive = training_additive.iloc[:, 1:] testing_additive = testing_additive.iloc[:, 1:] training_dominance = training_dominance.iloc[:, 1:] testing_dominance = testing_dominance.iloc[:, 1:] # Merge training and testing data with additive and dominance components training_data_merged = pd.concat([training_data, training_additive, training_dominance], axis=1) testing_data_merged = pd.concat([testing_data, testing_additive, testing_dominance], axis=1) phenotypic_info = training_data['phenotypes'].values phenotypic_test_info = testing_data['phenotypes'].values if 'phenotypes' in testing_data.columns else None sample_ids = testing_data.iloc[:, 0].values training_genotypic_data_merged = training_data_merged.iloc[:, 2:].values testing_genotypic_data_merged = testing_data_merged.iloc[:, 2:].values # Feature selection if feature_selection: rf = RandomForestRegressor(n_estimators=100, random_state=65) rf.fit(training_genotypic_data_merged, phenotypic_info) selector = SelectFromModel(rf, threshold="mean", prefit=True) training_genotypic_data_merged = selector.transform(training_genotypic_data_merged) testing_genotypic_data_merged = selector.transform(testing_genotypic_data_merged) print(f"Selected {training_genotypic_data_merged.shape[1]} features based on importance.") # Standardize the genotypic data scaler = StandardScaler() training_genotypic_data_merged = scaler.fit_transform(training_genotypic_data_merged) testing_genotypic_data_merged = scaler.transform(testing_genotypic_data_merged) outer_kf = KFold(n_splits=outer_n_splits) results = [] all_predicted_phenotypes = [] def calculate_metrics(true_values, predicted_values): mse = mean_squared_error(true_values, predicted_values) rmse = np.sqrt(mse) r2 = r2_score(true_values, predicted_values) corr = pearsonr(true_values, predicted_values)[0] return mse, rmse, r2, corr models = [ ('GRUModel', GRUModel), ('CNNModel', CNNModel), ('RFModel', RFModel), ('XGBoostModel', XGBoostModel) ] for outer_fold, (outer_train_index, outer_test_index) in enumerate(outer_kf.split(phenotypic_info), 1): outer_trainX = training_genotypic_data_merged[outer_train_index] outer_trainy = phenotypic_info[outer_train_index] outer_testX = testing_genotypic_data_merged outer_testy = phenotypic_test_info for model_name, model_func in models: print(f"Running model: {model_name} for fold {outer_fold}") if model_name in ['GRUModel', 'CNNModel']: predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy, epochs=epochs, batch_size=batch_size) elif model_name in ['RFModel']: predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy) else: predicted_train, predicted_test, history = model_func(outer_trainX, outer_trainy, outer_testX, outer_testy, learning_rate, min_child_weight) # Calculate metrics mse_train, rmse_train, r2_train, corr_train = calculate_metrics(outer_trainy, predicted_train) mse_test, rmse_test, r2_test, corr_test = calculate_metrics(outer_testy, predicted_test) if outer_testy is not None else (None, None, None, None) results.append({ 'Model': model_name, 'Fold': outer_fold, 'Train_MSE': mse_train, 'Train_RMSE': rmse_train, 'Train_R2': r2_train, 'Train_Corr': corr_train, 'Test_MSE': mse_test, 'Test_RMSE': rmse_test, 'Test_R2': r2_test, 'Test_Corr': corr_test }) if predicted_test is not None: predicted_test_df = pd.DataFrame({ 'Sample_ID': sample_ids, 'Predicted_Phenotype': predicted_test, 'Model': model_name }) all_predicted_phenotypes.append(predicted_test_df) # Compile results results_df = pd.DataFrame(results) avg_results_df = results_df.groupby('Model').agg({ 'Train_MSE': 'mean', 'Train_RMSE': 'mean', 'Train_R2': 'mean', 'Train_Corr': 'mean', 'Test_MSE': 'mean', 'Test_RMSE': 'mean', 'Test_R2': 'mean', 'Test_Corr': 'mean' }).reset_index() # Calculate the TOPSIS score for the average metrics (considering only MSE, RMSE, R², and Correlation) def calculate_topsis_score(df): # Normalize the data norm_df = (df.iloc[:, 1:] - df.iloc[:, 1:].min()) / (df.iloc[:, 1:].max() - df.iloc[:, 1:].min()) # Calculate the positive and negative ideal solutions ideal_positive = norm_df.max(axis=0) ideal_negative = norm_df.min(axis=0) # Calculate the Euclidean distances dist_positive = np.sqrt(((norm_df - ideal_positive) ** 2).sum(axis=1)) dist_negative = np.sqrt(((norm_df - ideal_negative) ** 2).sum(axis=1)) # Calculate the TOPSIS score topsis_score = dist_negative / (dist_positive + dist_negative) # Add the TOPSIS score to the dataframe df['TOPSIS_Score'] = topsis_score return df avg_results_df = calculate_topsis_score(avg_results_df) # Save the results with TOPSIS scores to the file avg_results_df.to_csv(output_file, index=False) # Save predicted phenotypes if all_predicted_phenotypes: predicted_all_df = pd.concat(all_predicted_phenotypes, axis=0, ignore_index=True) predicted_all_df.to_csv(predicted_phenotype_file, index=False) return avg_results_df, predicted_all_df if all_predicted_phenotypes else None # Save the results to the file #results_df.to_csv(output_file, index=False) # Save predicted phenotypes #if all_predicted_phenotypes: # predicted_all_df = pd.concat(all_predicted_phenotypes, axis=0, ignore_index=True) #predicted_all_df.to_csv(predicted_phenotype_file, index=False) # return results_df, predicted_all_df if all_predicted_phenotypes else None #--------------------------------------------------------------------Gradio interface--------------------------------------------------------------- def run_cross_validation(training_file, training_additive_file, testing_file, testing_additive_file, training_dominance_file, testing_dominance_file,feature_selection,learning_rate,min_child_weight): # Default parameters epochs = 1000 batch_size = 64 inner_n_splits = 2 min_child_weight=5 learning_rate=0.001 #learning_rate=learning_rate # min_child_weight=min_child_weight # Load datasets training_data = pd.read_csv(training_file.name) training_additive = pd.read_csv(training_additive_file.name) testing_data = pd.read_csv(testing_file.name) testing_additive = pd.read_csv(testing_additive_file.name) training_dominance = pd.read_csv(training_dominance_file.name) testing_dominance = pd.read_csv(testing_dominance_file.name) # Call the cross-validation function results, predicted_phenotypes = NestedKFoldCrossValidation( training_data=training_data, training_additive=training_additive, testing_data=testing_data, testing_additive=testing_additive, training_dominance=training_dominance, testing_dominance=testing_dominance, epochs=epochs, batch_size=batch_size, #outer_n_splits= outer_n_splits, #outer_n_splits=outer_n_splits, #inner_n_splits=inner_n_splits, learning_rate=learning_rate, min_child_weight=min_child_weight, feature_selection=feature_selection ) # Save outputs results_file = "cross_validation_results.csv" predicted_file = "predicted_phenotype.csv" results.to_csv(results_file, index=False) predicted_phenotypes.to_csv(predicted_file, index=False) return results_file, predicted_file # Gradio interface with gr.Blocks() as interface: gr.Markdown("# DeepMap - An Integrated GUI for Genotype to Phenotype Prediction") with gr.Row(): training_file = gr.File(label="Upload Training Data (CSV)") training_additive_file = gr.File(label="Upload Training Additive Data (CSV)") training_dominance_file = gr.File(label="Upload Training Dominance Data (CSV)") with gr.Row(): testing_file = gr.File(label="Upload Testing Data (CSV)") testing_additive_file = gr.File(label="Upload Testing Additive Data (CSV)") testing_dominance_file = gr.File(label="Upload Testing Dominance Data (CSV)") with gr.Row(): feature_selection = gr.Checkbox(label="Enable Feature Selection", value=True) output1 = gr.File(label="Cross-Validation Results (CSV)") output2 = gr.File(label="Predicted Phenotypes (CSV)") submit_btn = gr.Button("Run DeepMap") submit_btn.click( run_cross_validation, inputs=[ training_file, training_additive_file, testing_file, testing_additive_file, training_dominance_file,testing_dominance_file, feature_selection ], outputs=[output1, output2] ) # Launch the interface interface.launch()