Spaces:
Runtime error
Runtime error
""" | |
========================================================================================= | |
Trojan VQA | |
Written by Indranil Sur | |
Weight sensitivity analysis on last layers of TrojVQA clean and trojan models. | |
========================================================================================= | |
""" | |
import os | |
import copy | |
import json | |
import torch | |
import errno | |
import pandas as pd | |
import numpy as np | |
import argparse | |
from pathlib import Path | |
from xgboost import XGBClassifier | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.ensemble import RandomForestClassifier, StackingClassifier | |
from sklearn.svm import SVC | |
from sklearn.metrics import roc_curve,auc | |
from sklearn.model_selection import StratifiedKFold | |
# List of shallow classifiers to test | |
e1 = [ | |
('rf', RandomForestClassifier(n_estimators=10, random_state=42)), | |
('svr', SVC(kernel='linear', probability=True)) | |
] | |
clfs = [ | |
('XGB', XGBClassifier(eval_metric='mlogloss',use_label_encoder=False)), | |
('XGB_2', XGBClassifier(max_depth=2,gamma=2,eta=0.8,reg_alpha=0.5,reg_lambda=0.5,eval_metric='mlogloss',use_label_encoder=False)), | |
('LR', LogisticRegression(random_state=0, class_weight='balanced', C=1)), | |
('RF', RandomForestClassifier(random_state=0)), | |
('RF_10', RandomForestClassifier(n_estimators=10, random_state=42)), | |
('SVC_l', SVC(kernel='linear', probability=True)), | |
('SVC_r', SVC(kernel='rbf', probability=True)), | |
# ('SVC_p', SVC(kernel='poly', probability=True)), | |
('RF+SVC', StackingClassifier(estimators=e1, final_estimator=LogisticRegression())), | |
] | |
# List of all the architectures | |
model_archs = ['butd_eff', 'mfb', 'mfh', 'mcan_small', 'mcan_large', 'mmnasnet_small', 'mmnasnet_large', 'ban_4', 'ban_8', 'butd'] | |
def cross_entropy(prob, labels): | |
""" | |
Code to compute cross-entropy | |
prob: probabilities from the model (numpy: Nx1) | |
labels: ground-truth labels (numpy: Nx1) | |
""" | |
prob = torch.Tensor(prob).squeeze() | |
labels = torch.Tensor(labels).squeeze() | |
assert ( | |
prob.shape == labels.shape | |
), "Check size of labels and probabilities in computing cross-entropy" | |
ce = torch.nn.functional.binary_cross_entropy(prob, labels, reduction='none') | |
return ce.mean().item() | |
def get_feature(metadata, root): | |
feature_lst = [] | |
for model_id in metadata.model_name.to_list(): | |
feat = np.load('{}/{}.npy'.format(root, model_id)) | |
feature_lst.append(feat) | |
return feature_lst | |
def get_measures(features_train, labels_train, features_test, labels_test, ret_ce=True, n_splits=5): | |
ret = {} | |
for name, _clf in clfs: | |
# print (name) | |
clf = copy.deepcopy(_clf) | |
clf = clf.fit(features_train, labels_train) | |
pred_test = clf.predict_proba(features_test) | |
fpr, tpr, t = roc_curve(labels_test, pred_test[:, 1]) | |
roc_auc = auc(fpr, tpr) | |
ret[name] = {'auc': roc_auc} | |
if ret_ce: | |
ret[name]['ce'] = cross_entropy(pred_test[:, 1], labels_test) | |
if n_splits is not None: | |
kfold = StratifiedKFold(n_splits=5,shuffle=False) | |
cv_rocs = [] | |
cv_ces = [] | |
for train, test in kfold.split(features_train, labels_train): | |
clf = copy.deepcopy(_clf) | |
clf = clf.fit(features_train[train], labels_train[train]) | |
pred_test = clf.predict_proba(features_train[test]) | |
fpr, tpr, t = roc_curve(labels_train[test], pred_test[:, 1]) | |
roc_auc = auc(fpr, tpr) | |
cv_rocs.append(roc_auc) | |
if ret_ce: | |
ce = cross_entropy(pred_test[:, 1], labels_train[test]) | |
cv_ces.append(ce) | |
ret[name]['cv_auc_mean'] = np.mean(cv_rocs) | |
ret[name]['cv_auc_std'] = np.std(cv_rocs) | |
if ret_ce: | |
ret[name]['cv_ce_mean'] = np.mean(cv_ces) | |
ret[name]['cv_ce_std'] = np.std(cv_ces) | |
return ret | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Train shallow classifiers from wt features') | |
parser.add_argument('--ds_root', type=str, help='Root of data', required=True) | |
parser.add_argument('--ds', type=str, help='dataset', default='v1') | |
parser.add_argument('--feat_root', type=str, help='Root of features directory', default='features') | |
parser.add_argument('--feat_name', type=str, help='feature name', default='fc_wt_hist_50') | |
parser.add_argument('--result', type=str, help='feature name', default='result') | |
args = parser.parse_args() | |
root_train = Path(args.ds_root)/'{}-train-dataset/'.format(args.ds) | |
root_test = Path(args.ds_root)/'{}-test-dataset/'.format(args.ds) | |
metadata_train = pd.read_csv(root_train/'METADATA.csv') | |
metadata_test = pd.read_csv(root_test/'METADATA.csv') | |
feature_dir_train = Path(args.feat_root)/args.ds/args.feat_name/'train' | |
feature_dir_test = Path(args.feat_root)/args.ds/args.feat_name/'test' | |
feature_lst_train = get_feature(metadata_train, feature_dir_train) | |
feature_lst_test = get_feature(metadata_test, feature_dir_test) | |
features_train = np.stack(feature_lst_train) | |
features_test = np.stack(feature_lst_test) | |
labels_train = metadata_train.d_clean.to_numpy() | |
labels_test = metadata_test.d_clean.to_numpy() | |
try: | |
os.makedirs(args.result) | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
pass | |
out_file = Path(args.result)/'{}.json'.format(args.ds) | |
all_results = {} | |
all_results['ALL'] = get_measures(features_train, labels_train, features_test, labels_test) | |
for model in model_archs: | |
_features_train = features_train[metadata_train.model==model] | |
_labels_train = labels_train[metadata_train.model==model] | |
_features_test = features_test[metadata_test.model==model] | |
_labels_test = labels_test[metadata_test.model==model] | |
all_results[model] = get_measures(_features_train, _labels_train, _features_test, _labels_test) | |
with open(out_file, 'w') as outfile: | |
json.dump(all_results, outfile, indent=4) |