FSALA / src.py
Justin-12138's picture
Upload 2 files
15afd18
import csv
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.stats import f_oneway
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LassoLarsCV
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix
class MyModel:
def __init__(self, model):
self.clf = model
self.scaler = None
self.label_encoder = None
def train(self, X, Y):
# 对标签进行编码
self.label_encoder = LabelEncoder()
Y = self.label_encoder.fit_transform(Y)
# 对特征进行标准化
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
# 划分训练集和测试集
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3)
# 训练模型
self.clf.fit(X_train, Y_train)
def predict_samples(self, samples):
# 对样本进行相同的预处理步骤
samples = self.scaler.transform(samples)
# 使用模型进行预测
predictions = self.clf.predict(samples)
# 将预测的标签解码回原始值
predictions = self.label_encoder.inverse_transform(predictions)
return predictions
# choose classifier
def setclf(clf_name):
if clf_name == 'RF':
return RandomForestClassifier(n_jobs=-1)
elif clf_name == 'KNN':
return KNeighborsClassifier(n_jobs=-1)
elif clf_name == 'DT':
return DecisionTreeClassifier()
elif clf_name == 'SVM':
return SVC(C=1.0, kernel='rbf')
elif clf_name == 'Naive Bayes':
return GaussianNB()
# cal score
def add_max_score_to_list(temp_scores, current_score, selected_indices, selected_indices_list):
max_score_index = np.argmax(np.array(temp_scores))
current_score.append(temp_scores[max_score_index])
selected_indices.add(max_score_index)
selected_indices_list.append(max_score_index)
# load data
def load_data(data, out_name):
# global X, y
data = pd.read_csv(data.name)
if not out_name:
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
elif out_name:
X = data.iloc[:, :-1]
y = data.iloc[:, -1].values.flatten()
return X, y
def MRMR_FCD(data, testsample, num_fea_int):
X, y = load_data(data, False)
# 从test.csv加载测试样本和标签
test_samples, test_labels = load_data(testsample, False)
# 获取特征数量
# max_fea_num = X.shape[1]
num_features = len(X[0])
f_test_scores = [f_oneway(X[:, i], y)[0] for i in range(num_features)]
# 添加起始特征的分数到current_score
current_score = [max(f_test_scores)]
# 索引从最高分数的特征开始
start_feature_index = f_test_scores.index(max(f_test_scores))
selected_indices = set()
selected_indices_list = []
selected_indices.add(start_feature_index)
selected_indices_list.append(start_feature_index)
pearson_score_matrix = np.zeros((num_features, num_features))
for _ in range(num_fea_int - 1):
temp_scores = []
for i in range(num_features):
if i in selected_indices:
temp_scores.append(-float('inf'))
else:
f_test_score = f_test_scores[i]
diff = 0
for j in selected_indices:
# pearson score
if j > i:
if pearson_score_matrix[i][j] == 0:
pearson_score_matrix[i][j] = np.corrcoef(X[:, i], X[:, j])[0, 1]
diff += pearson_score_matrix[i][j]
else:
if pearson_score_matrix[j][i] == 0:
pearson_score_matrix[j][i] = np.corrcoef(X[:, i], X[:, j])[0, 1]
diff += pearson_score_matrix[j][i]
temp_scores.append(f_test_score - diff / len(selected_indices))
add_max_score_to_list(temp_scores, current_score, selected_indices, selected_indices_list)
combined = list(zip(selected_indices_list, current_score))
return combined, X, y, test_samples, test_labels
def MRMR_FCQ(data, testsample, num_fea_int):
X, y = load_data(data, False)
# 从test.csv加载测试样本和标签
test_samples, test_labels = load_data(testsample, False)
# 获取特征数量
# max_fea_num = X.shape[1]
num_fea_inttures = len(X[0])
f_test_scores = [f_oneway(X[:, i], y)[0] for i in range(num_fea_inttures)]
# 添加起始特征的分数到current_score
current_score = [max(f_test_scores)]
# 索引从0开始
# start_feature_index = random.randint(0, num_features - 1)
# 索引从最高分数的特征开始
start_feature_index = f_test_scores.index(max(f_test_scores))
selected_indices = set()
selected_indices_list = []
selected_indices.add(start_feature_index)
selected_indices_list.append(start_feature_index)
pearson_score_matrix = np.zeros((num_fea_inttures, num_fea_inttures))
for _ in range(num_fea_int - 1):
temp_scores = []
for i in range(num_fea_inttures):
if i in selected_indices:
temp_scores.append(-float('inf'))
else:
f_test_score = f_test_scores[i]
q = 0
for j in selected_indices:
# pearson score
if j > i:
if pearson_score_matrix[i][j] == 0:
pearson_score_matrix[i][j] = np.corrcoef(X[:, i], X[:, j])[0, 1]
q += pearson_score_matrix[i][j]
else:
if pearson_score_matrix[j][i] == 0:
pearson_score_matrix[j][i] = np.corrcoef(X[:, i], X[:, j])[0, 1]
q += pearson_score_matrix[j][i]
temp_scores.append(f_test_score / (q / len(selected_indices)))
add_max_score_to_list(temp_scores, current_score, selected_indices, selected_indices_list)
combined = list(zip(selected_indices_list, current_score))
return combined, X, y, test_samples, test_labels
def index_score_csv(sorted_combined, filename):
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Index", "Score"]) # 写入列名
writer.writerows(sorted_combined)
def isplot(num, width, height, title_gr, x, y, xlabbel, ylabel, filename):
plt.figure(num=num, figsize=(width, height))
plt.title(title_gr, fontsize=30)
plt.plot(x, y)
plt.xlabel(xlabel=xlabbel, fontsize=30)
plt.ylabel(ylabel=ylabel, fontsize=30)
plt.savefig(filename)
def ifsplot(num, width, height, title_gr, max_index, max_acc, acc, xlabbel, ylabel, filename):
plt.figure(num=num, figsize=(width, height))
plt.title("IFS_" + title_gr + "_Accuracy", fontsize=40)
plt.plot(max_index, max_acc, 'ro')
plt.plot(acc)
plt.annotate(f'({max_index}, {max_acc})', (max_index, max_acc), textcoords="offset points", xytext=(-5, 20),
ha='center', fontsize=40)
# 设置x轴和y轴的标签
plt.xlabel(xlabel=xlabbel, fontsize=40)
plt.ylabel(ylabel=ylabel, fontsize=40)
plt.savefig(filename)
def cmplot(num, width, height, cm, xlabbel, ylabel, filename):
plt.figure(num=num, figsize=(width, height))
sns.heatmap(cm, annot=True, fmt='d')
plt.xlabel(xlabel=xlabbel, fontsize=40)
plt.plot(ylabel=ylabel, fontsize=40)
plt.grid(True)
plt.savefig(filename)
pass
def des(choicce):
title = "FSALs: Robust Feature selection framework"
description = r"""<center><img src='https://raw.githubusercontent.com/Justin-12138/bio_if/d1fdf085f8e679dcceecc2c05014b1d4a237e033/assets/favicon.svg' alt='FSALs logo'></center>
<b>Official Gradio demo</b> for <a href='https://huggingface.co/spaces/Justin-12138/FSALA' target='_blank'><b>Application of Causal Inference in Alzheimer's Disease(CCFC2023)</b></a>.<br>
🔥 Fsals is a Robust feature selection framework based on causal inference. <br>
🤗 Try using fsals in different data sets.!<br>
"""
article = r"""
If FSALs is helpful, please help to ⭐ the <a href='https://github.com/Justin-12138/bio_if' target='_blank'>Github Repo</a>. Thanks!
[![GitHub Stars](https://img.shields.io/github/stars/Justin-12138/bio_if?style=social)](https://github.com/Justin-12138/bio_if)
---
📝 **Citation**
If our work is useful for your research, please consider citing:
```bibtex
@article{zlhl2023,
author = {Xiaolong Zhou, Zhao Liu, Yuchen Huang, Kun Lin},
title = {A Novel Ensemble Feature Selection Method for Biomarkers of Alzheimer's disease},
booktitle = {GUET Publisher},
year = {2023}
}
```
📋 **License**
This project is licensed under <a rel="license" href="https://github.com/Justin-12138/bio_if/blob/main/LICENSE">GPL License 2.0</a>.
Redistribution and use for non-commercial purposes should follow this license.
📧 **Contact**
If you have any questions, please feel free to reach me out at <b>justinliu707@gmail.com</b>.
<div>
🤗 Find Me:
<a href="https://github.com/Justin-12138"><img style="margin-top:0.5em; margin-bottom:2em" src="https://img.shields.io/github/followers/Justin-12138?style=social" alt="Github Follow"></a>
</div>
"""
if choicce == "title":
return title
elif choicce == "description":
return description
elif choicce == "article":
return article
elif choicce == 'inputs':
inputs = [gr.inputs.File(label="Training data"),
gr.inputs.Radio(['MRMR_FCD', 'MRMR_FCQ', 'CFS', 'Lasso', 'Ensemble', 'CI'], label="method"),
gr.inputs.Number(label="Num_feature(int)"),
gr.inputs.Radio(['RF', 'SVM', 'KNN', 'DT', 'Naive Bayes'], label="classifier for CV"),
gr.inputs.File(label="Testing data")
]
return inputs
elif choicce == 'outputs':
output = [gr.Image(label="Index_score"),
gr.Image(label="IFS_Acc"),
gr.Image(label="Confusion_matrix"),
gr.File(label='Index_score.csv')]
return output
def cv(X, y, index_0, clf, n_fold):
acc = []
for i in range(len(index_0)):
# 使用前i个特征进行交叉验证
selected_features = X[:, [int(j) - 1 for j in index_0[:i + 1]]]
scores = cross_val_score(clf, selected_features, y, cv=n_fold)
# 计算平均准确率并添加到acc列表中
acc.append(scores.mean())
max_acc = round(max(acc), 4)
max_index = acc.index(max(acc)) + 1
return acc, max_acc, max_index
def getindex_1(sorted_combined):
index_1 = []
index_0 = []
scores = []
for indy in sorted_combined:
index_1.append(str(indy[0] + 1))
scores.append(indy[1])
for item in index_1:
index_0.append(int(item) - 1)
return index_1, index_0, scores
def load_model(X, y, test_samples, test_labels):
models = SVC(C=1.0, kernel='rbf')
my_model = MyModel(models)
my_model.train(X, y)
# 预测测试样本的标签并计算准确率
predictions = my_model.predict_samples(test_samples)
# 计算混淆矩阵
cm = confusion_matrix(test_labels, predictions)
return cm
def lasso(data, testsample, num_fea_int):
X, y = load_data(data, True)
test_samples, test_labels = load_data(testsample, False)
cl = LassoLarsCV(cv=20, max_iter=80000).fit(X, y)
importance = np.abs(cl.coef_)
feature_names = list(X)
a = len(feature_names)
idx_features = (-importance).argsort()[:a]
# name_features = np.array(feature_names)[idx_features]
result = pd.DataFrame({'index': idx_features, 'Score': importance[idx_features]})
result_rank = result.sort_values(by='Score', ascending=False, ignore_index=True)
result_rank.to_csv("index-score.csv")
inde = result_rank['index'].tolist()
score = result_rank['Score'].tolist()
return X, y, inde, score, test_samples, test_labels, num_fea_int
def fs(data, method, num_fea_int, clf, testsample):
num_fea_int = int(num_fea_int)
if method == 'MRMR_FCD':
combined, X, y, test_samples, test_labels = MRMR_FCD(data=data, testsample=testsample, num_fea_int=num_fea_int)
# 使用sorted()函数对合并后的列表进行排序,key参数指定按照分数排序,reverse=True表示降序排序
sorted_combined = sorted(combined, key=lambda x: x[1], reverse=True)
index_score_csv(sorted_combined=sorted_combined, filename='ab.csv')
index_1, index_0, scores = getindex_1(sorted_combined=sorted_combined)
# 画score.png
isplot(1, 24, 10,
title_gr=str(method), x=index_1, y=scores,
xlabbel="index", ylabel="scores", filename="index-score.png")
# 选择分类器
clf = setclf(clf)
acc, max_acc, max_index = cv(X=X, y=y, index_0=index_0, clf=clf, n_fold=10)
# 画acc.png
ifsplot(2, 24, 10,
title_gr=str(method), max_index=max_index, max_acc=max_acc,
acc=acc, xlabbel="top n features", ylabel="acc", filename="acc.png")
cm = load_model(X=X, y=y, test_samples=test_samples, test_labels=test_labels)
cmplot(3, 24, 10, cm=cm,
xlabbel="predicted labels", ylabel="true labels", filename='confusion_matrix.png')
return 'index-score.png', 'acc.png', "confusion_matrix.png", "ab.csv"
elif method == 'MRMR_FCQ':
combined, X, y, test_samples, test_labels = MRMR_FCQ(data=data, testsample=testsample, num_fea_int=num_fea_int)
# 使用sorted()函数对合并后的列表进行排序,key参数指定按照分数排序,reverse=True表示降序排序
sorted_combined = sorted(combined, key=lambda x: x[1], reverse=True)
index_score_csv(sorted_combined=sorted_combined, filename='ab.csv')
# inde index start 1
index_1, index_0, scores = getindex_1(sorted_combined=sorted_combined)
# index-score.png
isplot(1, 24, 10, title_gr=str(method), x=index_1, y=scores,
xlabbel="index", ylabel="scores", filename="index-score.png")
# 选择分类器
clf = setclf(clf)
acc, max_acc, max_index = cv(X=X, y=y, index_0=index_0, clf=clf, n_fold=5)
# acc.png
ifsplot(2, 24, 10, title_gr=str(method), max_index=max_index,
max_acc=max_acc, acc=acc, xlabbel="top n features", ylabel="acc",
filename="acc.png")
# cal cm
cm = load_model(X=X, y=y, test_samples=test_samples, test_labels=test_labels)
cmplot(3, 24, 10,
cm=cm, xlabbel="predicted labels", ylabel="true labels", filename='confusion_matrix.png')
return 'index-score.png', 'acc.png', "confusion_matrix.png", "ab.csv"
elif method == 'Lasso':
X, y, inde, score, test_samples, test_labels, num_fea_int = lasso(data, testsample, num_fea_int)
index = []
for i in inde:
index.append(str(i))
plt.figure(1, figsize=(24, 12))
plt.title(str(method))
plt.plot(index[:num_fea_int], score[:num_fea_int])
# 设置x轴和y轴的标签
plt.xlabel('Feature Index', fontsize=40)
plt.ylabel('Feature Score', fontsize=40)
plt.savefig('Index_Score.png')
clf = setclf(clf)
inde = inde[:num_fea_int]
X = X.values
acc, max_acc, max_index = cv(X=X, y=y, index_0=inde, clf=clf, n_fold=5)
ifsplot(2, 24, 10, title_gr=str(method), max_index=max_index,
max_acc=max_acc, acc=acc, xlabbel="top n features", ylabel="acc",
filename="acc.png")
cm = load_model(X=X, y=y, test_samples=test_samples, test_labels=test_labels)
cmplot(3, 24, 10,
cm=cm, xlabbel="predicted labels", ylabel="true labels", filename='confusion_matrix.png')
return 'Index_Score.png', 'acc.png', "confusion_matrix.png", 'index-score.csv'
elif method == 'CFS':
pass