import matplotlib.pyplot as plt import numpy as np import pandas as pd import os import tensorflow as tf from tensorflow import keras import seaborn as sns from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score from sklearn.metrics import f1_score, confusion_matrix, precision_recall_curve, roc_curve from sklearn.metrics import ConfusionMatrixDisplay from sklearn.model_selection import train_test_split from tensorflow.keras import layers, losses from tensorflow.keras.datasets import fashion_mnist from tensorflow.keras.models import Model from plotly.subplots import make_subplots import plotly.graph_objects as go from sklearn.decomposition import PCA import plotly.express as px from scipy.interpolate import griddata import sklearn from sklearn.tree import DecisionTreeClassifier from sklearn.metrics import confusion_matrix, precision_score, roc_auc_score, precision_recall_curve from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, cross_val_predict, StratifiedKFold from sentence_transformers import SentenceTransformer from sklearn import tree import gradio as gr import os import json from datetime import datetime, timedelta import shutil import random import plotly.io as pio import joblib #load models autoencoder = keras.models.load_model('models/autoencoder') classifier = keras.models.load_model('models/classifier') decision_tree = joblib.load("models/decision_tree_model.pkl") llm_model = SentenceTransformer(r"sentence-transformers/paraphrase-MiniLM-L6-v2") pca_2d_llm_clusters = joblib.load('models/pca_llm_model.pkl') print("models loaded") #compute training dataset constant (min and max) for data normalization dataframe = pd.read_csv('ecg.csv', header=None) dataframe[140] = dataframe[140].apply(lambda x: 1 if x==0 else 0) df_ecg = dataframe[[i for i in range(140)]] ecg_raw_data = df_ecg.values labels = dataframe.values[:, -1] ecg_data = ecg_raw_data[:, :] train_data, test_data, train_labels, test_labels = train_test_split( ecg_data, labels, test_size=0.2, random_state=21) min_val = tf.reduce_min(train_data) max_val = tf.reduce_max(train_data) print("constant computing: OK") #compute PCA for latent space representation ecg_data = (ecg_data - min_val) / (max_val - min_val) ecg_data = tf.cast(ecg_data, tf.float32) print(ecg_data.shape) X = autoencoder.encoder(ecg_data).numpy() n_components=2 pca = PCA(n_components=n_components) X_compressed = pca.fit_transform(X) column_names = [f"Feature{i + 1}" for i in range(n_components)] categories = ["normal","heart disease"] target_categorical = pd.Categorical.from_codes(labels.astype(int), categories=categories) df_compressed = pd.DataFrame(X_compressed, columns=column_names) df_compressed["target"] = target_categorical print("PCA: done") #load dataset for decision tree map plot df_plot = pd.read_csv("df_mappa.csv", sep=",", header=0) print("df map for decision tree loaded.") #load dataset form llm pca df_pca_llm = pd.read_csv("df_PCA_llm.csv",sep=",",header=0) #useful functions def df_encoding(df): df.ExerciseAngina.replace( { 'N' : 'No', 'Y' : 'exercise-induced angina' }, inplace = True ) df.FastingBS.replace( { 0 : 'Not Diabetic', 1 : 'High fasting blood sugar' }, inplace = True ) df.Sex.replace( { 'M' : 'Man', 'F' : 'Female' }, inplace = True ) df.ChestPainType.replace( { 'ATA' : 'Atypical', 'NAP' : 'Non-Anginal Pain', 'ASY' : 'Asymptomatic', 'TA' : 'Typical Angina' }, inplace = True ) df.RestingECG.replace( { 'Normal' : 'Normal', 'ST' : 'ST-T wave abnormality', 'LVH' : 'Probable left ventricular hypertrophy' }, inplace = True ) df.ST_Slope.replace( { 'Up' : 'Up', 'Flat' : 'Flat', 'Down' : 'Downsloping' }, inplace = True ) return df def compile_text_no_target(x): text = f"""Age: {x['Age']}, Sex: {x['Sex']}, Chest Pain Type: {x['ChestPainType']}, RestingBP: {x['RestingBP']}, Cholesterol: {x['Cholesterol']}, FastingBS: {x['FastingBS']}, RestingECG: {x['RestingECG']}, MaxHR: {x['MaxHR']} Exercise Angina: {x['ExerciseAngina']}, Old peak: {x['Oldpeak']}, ST_Slope: {x['ST_Slope']} """ return text def LLM_transform(df , model = llm_model): sentences = df.apply(lambda x: compile_text_no_target(x), axis=1).tolist() #model = SentenceTransformer(r"sentence-transformers/paraphrase-MiniLM-L6-v2") output = model.encode(sentences=sentences, show_progress_bar= True, normalize_embeddings = True) df_embedding = pd.DataFrame(output) return df_embedding def upload_ecg(file): if len(os.listdir("current_ecg"))>0: # se ci sono file nella cartella, eliminali try: for filename in os.listdir("current_ecg"): file_path = os.path.join("current_ecg", filename) if os.path.isfile(file_path): os.remove(file_path) print(f"I file nella cartella 'current_ecg' sono stati eliminati.") except Exception as e: print(f"Errore nell'eliminazione dei file: {str(e)}") df = pd.read_csv(file.name,header=None) #file.name รจ il path temporaneo del file caricato source_directory = os.path.dirname(file.name) # Replace with the source directory path destination_directory = 'current_ecg' # Replace with the destination directory path # Specify the filename (including the extension) of the CSV file you want to copy file_to_copy = os.path.basename(file.name) # Replace with the actual filename # Construct the full source and destination file paths source_file_path = f"{source_directory}/{file_to_copy}" destination_file_path = f"{destination_directory}/{file_to_copy}" # Copy the file from the source directory to the destination directory shutil.copy(source_file_path, destination_file_path) return "Your ECG is ready, you can analyze it!" def ecg_availability(patient_name): folder_path = os.path.join("PATIENT",patient_name) status_file_path = os.path.join(folder_path, "status.json") # Check if the "status.json" file exists if not os.path.isfile(status_file_path): return None # If the file doesn't exist, return None # Load the JSON data from the "status.json" file with open(status_file_path, 'r') as status_file: status_data = json.load(status_file) # Extract the last datetime from the status JSON (if available) last_datetime_str = status_data.get("last_datetime", None) # Get the list of CSV files in the folder csv_files = [f for f in os.listdir(folder_path) if f.endswith(".csv")] if last_datetime_str is None: return f"New ECG available" # If the JSON is empty, return all CSV files last_datetime = datetime.strptime(last_datetime_str, "%B_%d_%H_%M_%S") # Find successive CSV files successive_csv_files = [] for csv_file in csv_files: csv_datetime_str = csv_file.split('.')[0] csv_datetime = datetime.strptime(csv_datetime_str, "%B_%d_%H_%M_%S") # Check if the CSV datetime is successive to the last saved datetime if csv_datetime > last_datetime: successive_csv_files.append(csv_file) if len(successive_csv_file)>0: return f"New ECG available (last ECG: {last_datetime})" else: return f"No ECG available (last ECG: {last_datetime})" def ecg_analysis(): df = pd.read_csv(os.path.join("current_ecg",os.listdir("current_ecg")[0])) df_ecg = df[[str(i) for i in range(140)]] #ecg data columns df_data = df_ecg.values #raw data. shape: (n_rows , 140) df_data = (df_data - min_val) / (max_val - min_val) df_data = tf.cast(df_data, tf.float32) #raw data. shape: (n_rows , 140) df_tree = df[["ChestPainType","ST_Slope"]].copy() #dataset for decision tree df_llm = df[["Age","Sex","ChestPainType","RestingBP","Cholesterol","FastingBS","RestingECG","MaxHR","ExerciseAngina","Oldpeak","ST_Slope"]].copy() # dataset for LLM true_label = df.values[:,-1] # ----------------ECG ANALYSIS WITH AUTOENCODER------------------------------- heartbeat_encoder_preds = autoencoder.encoder(df_data).numpy() #encoder data representation. shape: (n_rows , 8) heartbeat_decoder_preds = autoencoder.decoder(heartbeat_encoder_preds).numpy() #decoder data reconstruction. shape: (n_rows , 140) classification_res = classifier.predict(df_data) #shape: (n_rows , 1) print("shapes of: encoder preds, decoder preds, classification preds/n",heartbeat_encoder_preds.shape,heartbeat_decoder_preds.shape,classification_res.shape) #heartbeat_indexes = [i for i, pred in enumerate(classification_res) if pred == 0] p_encoder_preds = heartbeat_encoder_preds[0,:] #encoder representation of the chosen row p_decoder_preds = heartbeat_decoder_preds[0,:] #decoder reconstruction of the chosen row p_class_res = classification_res[0,:] # classification res of the chosen row p_true = true_label[0] #LATENT SPACE PLOT # Create the scatter plot fig = px.scatter(df_compressed, x='Feature1', y='Feature2', color='target', color_discrete_map={0: 'red', 1: 'blue'}, labels={'Target': 'Binary Target'},size_max=18) # Disable hover information # fig.update_traces(mode="markers", # hovertemplate = None, # hoverinfo = "skip") # Customize the plot layout fig.update_layout( #title='Latent space 2D (PCA reduction)', xaxis_title='component 1', yaxis_title='component 2' ) # add new point new_point_compressed = pca.transform(p_encoder_preds.reshape(1,-1)) new_point = {'X':[new_point_compressed[0][0]] , 'Y':[new_point_compressed[0][1]] } # Target value 2 for the new point new_point_df = pd.DataFrame(new_point) #fig.add_trace(px.scatter(new_point_df, x='X', y='Y').data[0]) fig.add_trace(go.Scatter( x=new_point_df['X'], y=new_point_df['Y'], mode='markers', marker=dict(symbol='star', color='black', size=15), name='actual patient' )) d = fig.to_dict() d["data"][0]["type"] = "scatter" fig=go.Figure(d) # DECODER RECONSTRUCTION PLOT # fig_reconstruction = plt.figure(figsize=(10,8)) # sns.set(font_scale = 2) # sns.set_style("white") # plt.plot(df_data[0], 'black',linewidth=2) # plt.plot(heartbeat_decoder_preds[0], 'red',linewidth=2) # plt.fill_between(np.arange(140), heartbeat_decoder_preds[0], df_data[0], color='lightcoral') # plt.legend(labels=["Input", "Reconstruction", "Error"]) fig_reconstruction = go.Figure() sns.set(font_scale=2) sns.set_style("white") # Plot 'Input' and 'Reconstruction' lines fig_reconstruction.add_trace( go.Scatter(x=np.arange(140), y=df_data[0], fill=None, mode='lines', name='Input', line=dict(color='black', width=3))) fig_reconstruction.add_trace( go.Scatter(x=np.arange(140), y=heartbeat_decoder_preds[0], fill=None, mode='lines', name='Reconstruction', line=dict(color='red', width=3))) # Create a custom fill area fill_x = list(np.arange(140)) + list(reversed(np.arange(140))) fill_y = list(heartbeat_decoder_preds[0]) + list(reversed(df_data[0])) fig_reconstruction.add_trace(go.Scatter(x=fill_x, y=fill_y, fill='tozeroy', fillcolor='rgba(255, 182, 193, 10.0)', mode='lines', line=dict(color='rgba(255, 182, 193, 0.5)', width=0), name='Error')) # Customize the legend's position (outside the graph) fig_reconstruction.update_layout( legend=dict( x=1.1, # Adjust the x-coordinate to position the legend outside y=1.05, # Adjust the y-coordinate to position the legend ) ) #classification probability # ----------DECISION TREE ANALYSIS--------------------------------- # Define the desired column order encoded_features = ['ST_Slope_Up', 'ST_Slope_Flat', 'ST_Slope_Down', 'ChestPainType_ASY', 'ChestPainType_ATA', 'ChestPainType_NAP', 'ChestPainType_TA'] #il modello vuole le colonne in un determinato ordine X_plot = pd.DataFrame(columns=encoded_features) for k in range(len(df_tree['ST_Slope'])): X_plot.loc[k] = 0 if df_tree['ST_Slope'][k] == 'Up': X_plot['ST_Slope_Up'][k] = 1 if df_tree['ST_Slope'][k] == 'Flat': X_plot['ST_Slope_Flat'][k] = 1 if df_tree['ST_Slope'][k] == 'Down': X_plot['ST_Slope_Down'][k] = 1 if df_tree['ChestPainType'][k] == 'ASY': X_plot['ChestPainType_ASY'][k] = 1 if df_tree['ChestPainType'][k] == 'ATA': X_plot['ChestPainType_ATA'][k] = 1 if df_tree['ChestPainType'][k] == 'NAP': X_plot['ChestPainType_NAP'][k] = 1 if df_tree['ChestPainType'][k] == 'TA': X_plot['ChestPainType_TA'][k] = 1 #model prediction y_score = decision_tree.predict_proba(X_plot)[:,1] chest_pain = [] slop = [] for k in range(len(X_plot)): if X_plot['ChestPainType_ASY'][k] == 1 and X_plot['ChestPainType_ATA'][k] == 0 and X_plot['ChestPainType_NAP'][k] == 0 and X_plot['ChestPainType_TA'][k] == 0: chest_pain.append(0) if X_plot['ChestPainType_ASY'][k] == 0 and X_plot['ChestPainType_ATA'][k] == 1 and X_plot['ChestPainType_NAP'][k] == 0 and X_plot['ChestPainType_TA'][k] == 0: chest_pain.append(1) if X_plot['ChestPainType_ASY'][k] == 0 and X_plot['ChestPainType_ATA'][k] == 0 and X_plot['ChestPainType_NAP'][k] == 1 and X_plot['ChestPainType_TA'][k] == 0: chest_pain.append(2) if X_plot['ChestPainType_ASY'][k] == 0 and X_plot['ChestPainType_ATA'][k] == 0 and X_plot['ChestPainType_NAP'][k] == 0 and X_plot['ChestPainType_TA'][k] == 1: chest_pain.append(3) if X_plot['ST_Slope_Up'][k] == 1 and X_plot['ST_Slope_Flat'][k] == 0 and X_plot['ST_Slope_Down'][k] == 0: slop.append(0) if X_plot['ST_Slope_Up'][k] == 0 and X_plot['ST_Slope_Flat'][k] == 1 and X_plot['ST_Slope_Down'][k] == 0: slop.append(1) if X_plot['ST_Slope_Up'][k] == 0 and X_plot['ST_Slope_Flat'][k] == 0 and X_plot['ST_Slope_Down'][k] == 1: slop.append(2) # Create a structured grid fig_tree = plt.figure() x1 = np.linspace(df_plot['ST_Slope'].min()-0.5, df_plot['ST_Slope'].max()+0.5) x2 = np.linspace(df_plot['ChestPainType'].min()-0.5, df_plot['ChestPainType'].max()+0.5) X1, X2 = np.meshgrid(x1, x2) # Interpolate the 'Prob' values onto the grid points = df_plot[['ST_Slope', 'ChestPainType']].values values = df_plot['Prob'].values Z = griddata(points, values, (X1, X2), method='nearest') # Create the contour plot with regions colored by interpolated 'Prob' plt.contourf(X1, X2, Z, cmap='coolwarm', levels=10) plt.colorbar(label='Predicted Probability') # Add data points if needed plt.scatter(slop[:1], chest_pain[:1], c="k", cmap='coolwarm', edgecolor='k', marker='o', label=f'prob={y_score[:1].round(3)}') # Remove the numerical labels from the x and y axes plt.xticks([]) plt.yticks([]) # Add custom labels "0" and "1" near the center of the axis plt.text(0.0, -0.7, "Up", ha='center',fontsize=15) plt.text(1.00, -0.7, "Flat", ha='center',fontsize=15) plt.text(2.00, -0.7, "Down", ha='center',fontsize=15) plt.text(-0.62, 0.0, "ASY", rotation='vertical', va='center',fontsize=15) plt.text(-0.62, 1.00, "ATA", rotation='vertical', va='center',fontsize=15) plt.text(-0.62, 2.0, "NAP", rotation='vertical', va='center',fontsize=15) plt.text(-0.62, 3.0, "TA", rotation='vertical', va='center',fontsize=15) # Add labels and title plt.xlabel('ST_Slope', fontsize=15, labelpad=20) plt.ylabel('ChestPainType', fontsize=15, labelpad=20) #plt.legend() # ------------LLM ANALYSIS------------------------------------ df_llm_encoding = df_encoding(df_llm) df_point_LLM = LLM_transform(df_llm_encoding) df_point_LLM.columns = [str(column) for column in df_point_LLM.columns] pca_llm_point = pca_2d_llm_clusters.transform(df_point_LLM) pca_llm_point.columns = ["comp1", "comp2"] #clusters # fig_llm_cluster = plt.figure() # x = df_pca_llm['comp1'] # y = df_pca_llm['comp2'] # labels = ['Cluster 0', 'Cluster 1', 'Cluster 2', 'Cluster 3'] # # Create a dictionary to map 'RestingECG' values to colors # color_mapping = {0: 'r', 1: 'b', 2: 'g', 3: 'y'} # for i in df_pca_llm['cluster'].unique(): # color = color_mapping.get(i, 'k') # Use 'k' (black) for undefined values # plt.scatter(x[df_pca_llm['cluster'] == i], y[df_pca_llm['cluster'] == i], c=color, label=labels[i]) # plt.scatter(pca_llm_point['comp1'], pca_llm_point['comp1'], c='k', marker='D') # # Remove the numerical labels from the x and y axes # plt.xticks([]) # plt.yticks([]) # plt.xlabel('Principal Component 1') # plt.ylabel('Principal Component 2') # plt.legend() # plt.grid(False) fig_llm_cluster = go.Figure() #use plotly for this, otherwise with matplotlib the legend will be ouside of the image when we use gradio for cluster in df_pca_llm['cluster'].unique(): cluster_data = df_pca_llm[df_pca_llm['cluster'] == cluster] fig_llm_cluster.add_trace( go.Scatter(x=cluster_data['comp1'], y=cluster_data['comp2'], mode='markers', name=f'Cluster {cluster}')) # Customize the marker size fig_llm_cluster.update_traces(marker=dict(size=12)) # Set axis labels fig_llm_cluster.update_xaxes(title_text="Principal Component 1") fig_llm_cluster.update_yaxes(title_text="Principal Component 2") # Add the additional point fig_llm_cluster.add_trace( go.Scatter(x=pca_llm_point['comp1'], y=pca_llm_point['comp2'], mode='markers', name='Patient', marker=dict(size=12, symbol='diamond', line=dict(width=2, color='Black')))) # Customize the legend's position (outside the graph) fig_llm_cluster.update_layout( legend=dict( x=1.05, # Adjust the x-coordinate to position the legend outside y=1 # Adjust the y-coordinate to position the legend ) ) # Deactivate the grid fig_llm_cluster.update_xaxes(showgrid=False) fig_llm_cluster.update_yaxes(showgrid=False) return fig, fig_reconstruction , f"Heart disease probability: {int(p_class_res[0]*100)} %" , fig_tree , f"Heart disease probability: {int(y_score[0]*100)} %" , fig_llm_cluster #demo app with gr.Blocks(title="TIQUE - AI DEMO CAPABILITIES") as demo: # demo = gr.Blocks() # with demo: gr.Markdown("

TIQUE: AI DEMO CAPABILITIES

") with gr.Row(): pazienti = ["Elisabeth Smith","Michael Mims"] menu_pazienti = gr.Dropdown(choices=pazienti,label="patients") available_ecg_result = gr.Textbox() menu_pazienti.input(ecg_availability, inputs=[menu_pazienti], outputs=[available_ecg_result]) with gr.Row(): input_file = gr.UploadButton("Upload patient's data and latest ECG ๐Ÿ“") text_upload_results = gr.Textbox() input_file.upload(upload_ecg,inputs=[input_file],outputs=text_upload_results) with gr.Row(): ecg_start_analysis_button = gr.Button(value="Start data analysis",scale=1) gr.Markdown("## Patient positioning on clusters") with gr.Row(): llm_cluster = gr.Plot() gr.Markdown("## ECG analysis:") with gr.Row(): with gr.Column(): latent_space_representation = gr.Plot() with gr.Column(): autoencoder_ecg_reconstruction = gr.Plot() classifier_nn_prediction = gr.Textbox() gr.Markdown("## Patient's classification based on Chest Pain Type and ST Slope:") with gr.Row(): decision_tree_plot = gr.Plot() decision_tree_proba = gr.Textbox() ecg_start_analysis_button.click(fn=ecg_analysis, inputs=None, outputs=[latent_space_representation, autoencoder_ecg_reconstruction, classifier_nn_prediction,decision_tree_plot, decision_tree_proba, llm_cluster]) if __name__ == "__main__": demo.launch()