Electro_oneAPI / app_f1.md
deepthiaj's picture
Duplicate from deepthiaj/Electro_MLapp
cd8de1a

A newer version of the Streamlit SDK is available: 1.45.0

Upgrade

import streamlit as st import pandas as pd import pickle import xgboost as xgb import numpy as np import sklearn from sklearn.metrics import confusion_matrix, classification_report import seaborn as sns import matplotlib.pyplot as plt from io import StringIO from scipy import signal import daal4py as d4p import time from sklearn.model_selection import train_test_split

st.title("Automated Diagnosis of Heart Disease from Electro-Cardiogram") st.write('This is a prototype for checking heart health condition. The performance of the model has been achieved using XGboost ML algorithm.') st.write('Please select the data and the model from the dropdown menu on the left panel to see the working of this prototype.')

st.divider()

enc_dat = pd.read_csv("PTB_ECGencoded_dat.csv")

Split the dataset into features (X) and target (y)

X = enc_dat.iloc[:, :-1].values # Features (all columns except the last one) y = enc_dat.iloc[:, -1].values # Target (last column "diagnosis")

Map the existing class labels to the expected class values

class_mapping = {0: 0, 1: 1, 3: 2, 4: 3, 6: 4, 7: 5} mapped_labels = np.array([class_mapping[label] for label in y])

split data into train and test sets

seed = 7 test_size = 0.33 X_train, X_test, y_train, y_test = train_test_split(X, mapped_labels, test_size=test_size, random_state=seed)

Define the model parameters

model_params = { 'objective': 'multi:softmax', 'num_class': 6,
'random_state': 42 }

Create and train the XGBoost model

xgb_model = xgb.XGBClassifier(**model_params) eval_set = [(X_test, y_test)] xgb_model.fit(X_train, y_train, early_stopping_rounds=10, eval_set=eval_set, verbose=True)

DAAL model

daal_model = d4p.get_gbt_model_from_xgboost(xgb_model.get_booster())

st.subheader("Performance evaluation of the Automated Diagnosis Model")

if st.button('ECG analysis of Patient001'): # patient001_signal_analysis() to visualize data analysis of single patient upon a button click st.write('give plots and heart rate analysis. Please upload ECG signal data in specified format below for analysis') # refer PTB website for format # call preprocessing module # call ecg_analysis()

st.divider() # # Evaluate the model on the entire dataset

XGBoost prediction (for accuracy comparison)

t0 = time.time() y_pred = xgb_model.predict(X_test) t1 = time.time() xgb_errors_count = np.count_nonzero(y_pred - np.ravel(y_test))

xgb_total = t1-t0 st.write("Prediction time using XGBoost model is ", xgb_total) accuracy = np.sum(y_pred == y_test) / len(y_test) # Calculate accuracy # print(f"Accuracy: {accuracy}") acc = (accuracy / 1) * 100 st.write("The accuracy of the diagnosis report is: ", acc, "%")

st.divider()

# # Evaluate the model on the entire dataset
# y_pred = loaded_model.predict(X)

# # Calculate evaluation metrics

classification_metrics = classification_report(y_test, y_pred, output_dict=True) st.caption(":blue[Classification Metrics]")

classification_metrics = [classification_metrics]

cm = classification_metrics.insert(0,'metrics')

st.table(classification_metrics)

st.json(classification_metrics)

st.write("1: Myocardial infarction, 2: Bundle branch block, 3: Dysrhythmia , 4: Valvular heart disease, 5: Myocarditis")

st.divider() # # Calculate confusion matrix confusion_mat = confusion_matrix(y_test, y_pred)

st.write("Confusion matrix:")

# # Plot confusion matrix

plt.figure(figsize=(10, 8)) htmap = sns.heatmap(confusion_mat, annot=True, fmt="d", cmap="Blues") plt.title("Confusion Matrix") plt.xlabel("Predicted Class") plt.ylabel("True Class") plt.show() htmap = htmap.figure st.pyplot(htmap)

st.divider() # Format signal info & preprocessing module for generating X[0] to diagnose from an external input data & give a dropbox to enter a single patient ecg data in .dat and .hea format

Make a faster prediction with oneDAL

n_classes = 6

daal_prediction = d4p.gbt_classification_prediction(nClasses = n_classes).compute(X, daal_model).prediction

daal4py prediction for increased performance

daal_predict_algo = d4p.gbt_classification_prediction( nClasses=n_classes, resultsToEvaluate="computeClassLabels", fptype='float' ) t0 = time.time() daal_prediction = daal_predict_algo.compute(X_test, daal_model) t1 = time.time() daal_errors_count = np.count_nonzero(np.ravel(daal_prediction.prediction) - np.ravel(y_test))

d4p_total = t1-t0 st.write("Prediction time using DAAL model is ", xgb_total)

# List all results that you need by placing '|' between them

predict_algo = d4p.gbt_classification_prediction(nClasses = n_classes, resultsToEvaluate = "computeClassLabels|computeClassProbabilities")

daal_prediction = predict_algo.compute(X, daal_model)

# Get probabilities:

probabilities = daal_prediction.probabilities

st.write(probabilities)

# Get labels:

labels = daal_prediction.prediction

st.write(labels)

assert np.absolute(xgb_errors_count - daal_errors_count) = 0

y_test = np.ravel(y_test) daal_prediction = np.ravel(daal_prediction.prediction) xgb_prediction = y_pred

st.subheader("Accuracy & Performance Comparison: XGBoots Prediction vs. Daal4py Prediction") st.write("No accuracy loss!") st.write("\nXGBoost prediction results (first 10 rows):\n", xgb_prediction[0:10]) st.write("\ndaal4py prediction results (first 10 rows):\n", daal_prediction[0:10]) st.write("\nGround truth (first 10 rows):\n", y_test[0:10])

st.write("XGBoost errors count:", xgb_errors_count) st.write("XGBoost accuracy score:", 1 - xgb_errors_count / xgb_prediction.shape[0])

st.write("\ndaal4py errors count:", daal_errors_count) st.write("daal4py accuracy score:", 1 - daal_errors_count / daal_prediction.shape[0])

st.write("\n XGBoost Prediction Time:", xgb_total) st.write("\n daal4py Prediction Time:", d4p_total)

st.write("\nAll looks good!")

st.subheader("Visualizations") st.write("Performance") left = [1,2] pred_times = [xgb_total, d4p_total] tick_label = ['XGBoost Prediction', 'daal4py Prediction']

plt.bar(left, pred_times, tick_label = tick_label, width = 0.5, color = ['red', 'blue'])

plt.xlabel('Prediction Method'); plt.ylabel('time,s'); plt.title('Prediction time,s') plt.show()

plt0 = plt0.figure

st.pyplot(plt0)

st.bar_chart(pred_times) st.write("speedup:",xgb_total/d4p_total) st.write("Accuracy") left = [1,2]

xgb_acc = 1 - xgb_errors_count / xgb_prediction.shape[0] d4p_acc = 1 - daal_errors_count / daal_prediction.shape[0] pred_acc = [xgb_acc, d4p_acc] tick_label = ['XGBoost Prediction', 'daal4py Prediction']

plt.bar(left, pred_acc, tick_label = tick_label, width = 0.5, color = ['red', 'blue'])

plt.xlabel('Prediction Method') plt.ylabel('accuracy, %') plt.title('Prediction Accuracy, %') plt.show()

plt1 = plt1.figure

st.pyplot(plt1)

st.bar_chart(pred_acc) st.write("Accuracy Difference",xgb_acc-d4p_acc)

st.divider()

patient_enc_data = {"Patient001":X[0],"Patient002":X[100],"Patient003":X[200],"Patient004":X[50],"Patient005":X[40],"Patient006":X[30],"Patient007":X[20],"Patient008":X[10],"Patient009":X[60],"Patient010":X[110],"Patient011":X[120],"Patient012":X[130],"Patient013":X[140],"Patient014":X[150],"Patient015":X[160],"Patient016":X[170],"Patient017":X[180],"Patient018":X[190],"Patient019":X[210],"Patient020":X[220],"Patient021":X[21],"Patient022":X[22],"Patient023":X[23],"Patient024":X[24],"Patient025":X[25],"Patient026":X[26],"Patient027":X[27],"Patient028":X[28],"Patient029":X[29],"Patient030":X[31],"Patient031":X[41],"Patient032":X[42],"Patient033":X[43],"Patient034":X[44],"Patient035":X[45],"Patient036":X[46],"Patient037":X[47],"Patient038":X[48],"Patient039":X[49],"Patient040":X[51],"Patient41":X[61],"Patient042":X[62],"Patient043":X[63],"Patient044":X[64],"Patient045":X[65],"Patient046":X[66],"Patient047":X[67],"Patient048":X[68],"Patient049":X[69],"Patient050":X[71], } patient_ecg_sel = st.selectbox( "Select a ECG of a patient from the list", list(patient_enc_data.keys()))

def ecg_analysis(ecg_test_data):

# Classify the test data point
predicted_class = xgb_model.predict(np.array([ecg_test_data]))


st.subheader("Diagnosis Report")


if predicted_class[0] == 0:
    st.write("Sorry, We cannot give your diagnosis report at the moment. Kindly consult a doctor in person.")
elif predicted_class[0] == 1:
    st.write("You are diagnosed with Myocardial infarction.")
    st.write("Kindly consult a doctor to take the necessary treatment.")
elif predicted_class[0] == 2:
    st.write("You are diagnosed with Bundle branch block.")
    st.write("Kindly consult a doctor to take the necessary treatment.")
elif predicted_class[0] == 3:
    st.write("You are diagnosed with Dysrhythmia.")
    st.write("Kindly take consult a doctor to the necessary treatment.")
elif predicted_class[0] == 4:
    st.write("You are diagnosed with Valvular heart disease.") 
    st.write("Kindly consult a doctor to take the necessary treatment.")
elif predicted_class[0] == 5:
    st.write("You are diagnosed with Myocarditis.")  
    st.write("Kindly consult a doctor to take the necessary treatment.")
else:
    st.write("Sorry, We cannot give your diagnosis report at the moment. Kindly consult a doctor in person.")

if st.button("Analyze Raw ECG"):

# if new_data:

# new_patient_data_preprocessing()

# else:

     ecg_train_dat = pd.read_csv("PTB_ECGdata.csv")
     diagnosis_counts = ecg_train_dat["diagnosis"].value_counts()
     st.bar_chart(diagnosis_counts)

def new_patient_data_preprocessing(new_data):

# code to preprocess .dat and .hea files from PTB ecg database, check one from ptb xl as external new data & convert it into .csv & encode to pass it as an argument to call ecg_analysis function
st.write('')

st.write("")

uploaded_file = st.file_uploader("Upload ECG file") if uploaded_file is not None:

# Can be used wherever a "file-like" object is accepted:
dataframe = pd.read_csv(uploaded_file)
st.write(dataframe[:1])
new_patient_data_preprocessing(dataframe)

if st.button("Check Heart health"): ecg_test_data = patient_enc_data[patient_ecg_sel] st.write("Diagnosis report of", patient_ecg_sel) # st_profile_report(ecg_test_data) ecg_analysis(ecg_test_data) else: st.write("Diagnosis report of Patient001") ecg_test_data = X[0]
ecg_analysis(ecg_test_data)