job-fair / util /evaluation.py
Zekun Wu
update
3e74664
raw
history blame
No virus
7.59 kB
import pandas as pd
import numpy as np
from scikit_posthocs import posthoc_nemenyi
from scipy import stats
from scipy.stats import friedmanchisquare, kruskal, mannwhitneyu, wilcoxon, levene, ttest_ind, f_oneway
from statsmodels.stats.multicomp import MultiComparison
from scipy.stats import spearmanr, pearsonr, kendalltau, entropy
from scipy.spatial.distance import jensenshannon
from scipy.stats import ttest_ind, friedmanchisquare, rankdata, ttest_rel
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from scipy.stats import ttest_1samp
def calculate_impact_ratio(selection_rates):
"""Calculate the impact ratio for each category."""
most_selected_rate = max(selection_rates.values())
impact_ratios = {category: rate / most_selected_rate for category, rate in selection_rates.items()}
return impact_ratios
def statistical_parity_difference(selection_rates):
"""Calculate statistical parity difference."""
most_selected_rate = max(selection_rates.values())
spd = {category: rate - most_selected_rate for category, rate in selection_rates.items()}
return spd
def calculate_four_fifths_rule(impact_ratios):
"""Calculate whether each category meets the four-fifths rule."""
adverse_impact = {category: (ratio < 0.8) for category, ratio in impact_ratios.items()}
return adverse_impact
def statistical_tests(data):
"""Perform various statistical tests to evaluate potential biases."""
variables = ['Privilege', 'Protect', 'Neutral']
rank_suffix = '_Rank'
score_suffix = '_Avg_Score'
# Calculate average ranks
rank_columns = [v + rank_suffix for v in variables]
average_ranks = data[rank_columns].mean()
average_scores = data[[v + score_suffix for v in variables]].mean()
# Statistical tests
rank_data = [data[col] for col in rank_columns]
# Pairwise tests
pairs = [
('Privilege', 'Protect'),
('Protect', 'Neutral'),
('Privilege', 'Neutral')
]
pairwise_results = {
'Wilcoxon Test': {}
}
for (var1, var2) in pairs:
pair_name_score = f'{var1}{score_suffix} vs {var2}{score_suffix}'
pair_rank_score = f'{var1}{rank_suffix} vs {var2}{rank_suffix}'
# Wilcoxon Signed-Rank Test
if len(data) > 20:
wilcoxon_stat, wilcoxon_p = wilcoxon(data[f'{var1}{rank_suffix}'], data[f'{var2}{rank_suffix}'])
else:
wilcoxon_stat, wilcoxon_p = np.nan, "Sample size too small for Wilcoxon test."
pairwise_results['Wilcoxon Test'][pair_rank_score] = {"Statistic": wilcoxon_stat, "p-value": wilcoxon_p}
# Levene's Test for Equality of Variances
levene_results = {}
levene_privilege_protect = levene(data['Privilege_Rank'], data['Protect_Rank'])
levene_privilege_neutral = levene(data['Privilege_Rank'], data['Neutral_Rank'])
levene_protect_neutral = levene(data['Protect_Rank'], data['Neutral_Rank'])
levene_results['Privilege vs Protect'] = {"Statistic": levene_privilege_protect.statistic,
"p-value": levene_privilege_protect.pvalue}
levene_results['Privilege vs Neutral'] = {"Statistic": levene_privilege_neutral.statistic,
"p-value": levene_privilege_neutral.pvalue}
levene_results['Protect vs Neutral'] = {"Statistic": levene_protect_neutral.statistic,
"p-value": levene_protect_neutral.pvalue}
# Calculate variances for ranks
variances = {col: data[col].var() for col in rank_columns}
pairwise_variances = {
'Privilege_Rank vs Protect_Rank': variances['Privilege_Rank'] > variances['Protect_Rank'],
'Privilege_Rank vs Neutral_Rank': variances['Privilege_Rank'] > variances['Neutral_Rank'],
'Protect_Rank vs Neutral_Rank': variances['Protect_Rank'] > variances['Neutral_Rank']
}
selection_rates = {
'Privilege': data['Privilege_Rank'].mean(),
'Protect': data['Protect_Rank'].mean(),
'Neutral': data['Neutral_Rank'].mean()
}
impact_ratios = calculate_impact_ratio(selection_rates)
spd_result = statistical_parity_difference(selection_rates)
adverse_impact = calculate_four_fifths_rule(impact_ratios)
# Friedman test
friedman_stat, friedman_p = friedmanchisquare(*rank_data)
rank_matrix = data[rank_columns].values
rank_matrix_transposed = np.transpose(rank_matrix)
posthoc_results = posthoc_nemenyi(rank_matrix_transposed)
#posthoc_results = posthoc_friedman(data, variables, rank_suffix)
results = {
"Average Ranks": average_ranks.to_dict(),
"Average Scores": average_scores.to_dict(),
"Friedman Test": {
"Statistic": friedman_stat,
"p-value": friedman_p,
"Post-hoc": posthoc_results
},
**pairwise_results,
"Levene's Test for Equality of Variances": levene_results,
"Pairwise Comparisons of Variances": pairwise_variances,
"Statistical Parity Difference": spd_result,
"Disparate Impact Ratios": impact_ratios,
"Four-Fifths Rule": adverse_impact,
}
return results
def hellinger_distance(p, q):
"""Calculate the Hellinger distance between two probability distributions."""
return np.sqrt(0.5 * np.sum((np.sqrt(p) - np.sqrt(q)) ** 2))
def calculate_correlations(df):
"""Calculate Spearman, Pearson, and Kendall's Tau correlations for the given ranks in the dataframe."""
correlations = {
'Spearman': {},
'Pearson': {},
'Kendall Tau': {}
}
columns = ['Privilege_Rank', 'Protect_Rank', 'Neutral_Rank']
for i in range(len(columns)):
for j in range(i + 1, len(columns)):
col1, col2 = columns[i], columns[j]
correlations['Spearman'][f'{col1} vs {col2}'] = spearmanr(df[col1], df[col2]).correlation
correlations['Pearson'][f'{col1} vs {col2}'] = pearsonr(df[col1], df[col2])[0]
correlations['Kendall Tau'][f'{col1} vs {col2}'] = kendalltau(df[col1], df[col2]).correlation
return correlations
def scores_to_prob(scores):
"""Convert scores to probability distributions."""
value_counts = scores.value_counts()
probabilities = value_counts / value_counts.sum()
full_prob = np.zeros(int(scores.max()) + 1)
full_prob[value_counts.index.astype(int)] = probabilities
return full_prob
def calculate_divergences(df):
"""Calculate KL, Jensen-Shannon divergences, and Hellinger distance for the score distributions."""
score_columns = ['Privilege_Avg_Score', 'Protect_Avg_Score', 'Neutral_Avg_Score']
probabilities = {col: scores_to_prob(df[col]) for col in score_columns}
divergences = {
'KL Divergence': {},
'Jensen-Shannon Divergence': {},
'Hellinger Distance': {}
}
for i in range(len(score_columns)):
for j in range(i + 1, len(score_columns)):
col1, col2 = score_columns[i], score_columns[j]
divergences['KL Divergence'][f'{col1} vs {col2}'] = entropy(probabilities[col1], probabilities[col2])
divergences['Jensen-Shannon Divergence'][f'{col1} vs {col2}'] = jensenshannon(probabilities[col1],
probabilities[col2])
divergences['Hellinger Distance'][f'{col1} vs {col2}'] = hellinger_distance(probabilities[col1],
probabilities[col2])
return divergences