Fancy-yousa's picture
Upload 78 files
b5567db verified
import sympy as sp
import numpy as np
from sklearn.metrics import mutual_info_score
# 符号
import sympy as sp
import pandas as pd
# symbols
X = sp.Symbol("X")
Y = sp.Symbol("Y")
Z = sp.Symbol("Z")
class MI(sp.Function):
nargs = (2,)
class CMI(sp.Function):
nargs = (3,)
class II(sp.Function):
nargs = (3,) # interaction information
ALLOWED_LOCALS = {
"X": X,
"Y": Y,
"Z": Z,
"I": MI, # I(X,Y)
"CI": CMI, # I(X,Y|Z) 条件互信息
"II": II # I(X;Y;Z)交互信息
}
def parse_expression(expr_str: str) -> sp.Expr:
"""
String → SymPy Expression
"""
expr = sp.sympify(expr_str, locals=ALLOWED_LOCALS)
return expr
def entropy(x):#计算熵
_, cnt = np.unique(x, return_counts=True)
p = cnt / cnt.sum()
return -np.sum(p * np.log(p + 1e-12))
def mi(x, y):#互信息
return mutual_info_score(x, y)
def cmi(x, y, z):#条件互信息(通过熵的加减计算)
# I(X;Y|Z) = H(X,Z)+H(Y,Z)-H(Z)-H(X,Y,Z)
return (
entropy(np.c_[x, z].tolist())
+ entropy(np.c_[y, z].tolist())
- entropy(z)
- entropy(np.c_[x, y, z].tolist())
)
def interaction_info(x, y, z):#交互信息
# I(X;Y;Z) = I(X;Y) - I(X;Y|Z)
return mi(x, y) - cmi(x, y, z)
def expr_to_callable(expr: sp.Expr):
def eval_node(node, ctx):
if isinstance(node, MI):
return mi(eval_node(node.args[0], ctx),
eval_node(node.args[1], ctx))
if isinstance(node, CMI):
return cmi(eval_node(node.args[0], ctx),
eval_node(node.args[1], ctx),
eval_node(node.args[2], ctx))
if isinstance(node, II):
return interaction_info(
eval_node(node.args[0], ctx),
eval_node(node.args[1], ctx),
eval_node(node.args[2], ctx)
)
if node == X:
return ctx["X"]
if node == Y:
return ctx["Y"]
if node == Z:
return ctx["Z"]
if node.is_Number:
return float(node)
if node.is_Add:
return sum(eval_node(arg, ctx) for arg in node.args)
if node.is_Mul:
r = 1.0
for arg in node.args:
r *= eval_node(arg, ctx)
return r
if node.is_Pow:
base, exp = node.args
return eval_node(base, ctx) ** eval_node(exp, ctx)
raise ValueError(f"Unsupported node: {node}")
def f(X_arr, Y_arr, Z_arr=None):
ctx = {"X": X_arr, "Y": Y_arr}
if Z_arr is not None:
ctx["Z"] = Z_arr
return eval_node(expr, ctx)
return f
from sklearn.preprocessing import LabelEncoder
def changetosinge(x):
return float(x)
# scores = f(X, y, X_other_list)
def prepare_data(dataname, base_url):
url = os.path.join(base_url, dataname + '.mat')
data = scio.loadmat(url)
X0 = pd.DataFrame(data['X'])
y0 = pd.DataFrame(data['Y'])
if dataname == 'Dermatology':
Special = X0.iloc[:, -1]
a = np.array([item[0] for item in Special])
label_encoder = LabelEncoder()
a33 = label_encoder.fit_transform(a)
X0 = X0.iloc[:, :-1]
X0[33] = a33
X0 = X0.applymap(changetosinge)
y0 = y0.applymap(changetosinge)
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y0)
y = pd.DataFrame(y_encoded)
X = pd.DataFrame()
for col in X0.columns:
X[col] = pd.cut(X0[col], bins=5, labels=False)
new_columns = [str(i) for i in range(X.shape[1] + 1)]
X = X.rename(columns=dict(zip(X.columns, new_columns[:-1])))
y = y.rename(columns=dict(zip(y.columns, [new_columns[-1]])))
data_processed = pd.concat([X, y], axis=1)
# data_processed = pd.DataFrame(X)
return data_processed, list(set(y_encoded))
import os
import scipy.io as scio
dataname = 'Authorship'
base_url = '/home/fangsensen/AutoFS/data/'
data_processed, class_set = prepare_data(dataname, base_url)
# print(data_processed)
# X_arr = data_processed['0']
# y_arr = data_processed['69']
print(111111,X_arr,2222222,y_arr)
expr = parse_expression("I(X,Y)")
f = expr_to_callable(expr)
score = f(X_arr, y_arr)
print(score)