Spaces:
Runtime error
Runtime error
File size: 6,125 Bytes
0392181 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
"""
=========================================================================================
Trojan VQA
Written by Indranil Sur
Weight sensitivity analysis on last layers of TrojVQA clean and trojan models.
=========================================================================================
"""
import os
import copy
import json
import torch
import errno
import pandas as pd
import numpy as np
import argparse
from pathlib import Path
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.svm import SVC
from sklearn.metrics import roc_curve,auc
from sklearn.model_selection import StratifiedKFold
# List of shallow classifiers to test
e1 = [
('rf', RandomForestClassifier(n_estimators=10, random_state=42)),
('svr', SVC(kernel='linear', probability=True))
]
clfs = [
('XGB', XGBClassifier(eval_metric='mlogloss',use_label_encoder=False)),
('XGB_2', XGBClassifier(max_depth=2,gamma=2,eta=0.8,reg_alpha=0.5,reg_lambda=0.5,eval_metric='mlogloss',use_label_encoder=False)),
('LR', LogisticRegression(random_state=0, class_weight='balanced', C=1)),
('RF', RandomForestClassifier(random_state=0)),
('RF_10', RandomForestClassifier(n_estimators=10, random_state=42)),
('SVC_l', SVC(kernel='linear', probability=True)),
('SVC_r', SVC(kernel='rbf', probability=True)),
# ('SVC_p', SVC(kernel='poly', probability=True)),
('RF+SVC', StackingClassifier(estimators=e1, final_estimator=LogisticRegression())),
]
# List of all the architectures
model_archs = ['butd_eff', 'mfb', 'mfh', 'mcan_small', 'mcan_large', 'mmnasnet_small', 'mmnasnet_large', 'ban_4', 'ban_8', 'butd']
def cross_entropy(prob, labels):
"""
Code to compute cross-entropy
prob: probabilities from the model (numpy: Nx1)
labels: ground-truth labels (numpy: Nx1)
"""
prob = torch.Tensor(prob).squeeze()
labels = torch.Tensor(labels).squeeze()
assert (
prob.shape == labels.shape
), "Check size of labels and probabilities in computing cross-entropy"
ce = torch.nn.functional.binary_cross_entropy(prob, labels, reduction='none')
return ce.mean().item()
def get_feature(metadata, root):
feature_lst = []
for model_id in metadata.model_name.to_list():
feat = np.load('{}/{}.npy'.format(root, model_id))
feature_lst.append(feat)
return feature_lst
def get_measures(features_train, labels_train, features_test, labels_test, ret_ce=True, n_splits=5):
ret = {}
for name, _clf in clfs:
# print (name)
clf = copy.deepcopy(_clf)
clf = clf.fit(features_train, labels_train)
pred_test = clf.predict_proba(features_test)
fpr, tpr, t = roc_curve(labels_test, pred_test[:, 1])
roc_auc = auc(fpr, tpr)
ret[name] = {'auc': roc_auc}
if ret_ce:
ret[name]['ce'] = cross_entropy(pred_test[:, 1], labels_test)
if n_splits is not None:
kfold = StratifiedKFold(n_splits=5,shuffle=False)
cv_rocs = []
cv_ces = []
for train, test in kfold.split(features_train, labels_train):
clf = copy.deepcopy(_clf)
clf = clf.fit(features_train[train], labels_train[train])
pred_test = clf.predict_proba(features_train[test])
fpr, tpr, t = roc_curve(labels_train[test], pred_test[:, 1])
roc_auc = auc(fpr, tpr)
cv_rocs.append(roc_auc)
if ret_ce:
ce = cross_entropy(pred_test[:, 1], labels_train[test])
cv_ces.append(ce)
ret[name]['cv_auc_mean'] = np.mean(cv_rocs)
ret[name]['cv_auc_std'] = np.std(cv_rocs)
if ret_ce:
ret[name]['cv_ce_mean'] = np.mean(cv_ces)
ret[name]['cv_ce_std'] = np.std(cv_ces)
return ret
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Train shallow classifiers from wt features')
parser.add_argument('--ds_root', type=str, help='Root of data', required=True)
parser.add_argument('--ds', type=str, help='dataset', default='v1')
parser.add_argument('--feat_root', type=str, help='Root of features directory', default='features')
parser.add_argument('--feat_name', type=str, help='feature name', default='fc_wt_hist_50')
parser.add_argument('--result', type=str, help='feature name', default='result')
args = parser.parse_args()
root_train = Path(args.ds_root)/'{}-train-dataset/'.format(args.ds)
root_test = Path(args.ds_root)/'{}-test-dataset/'.format(args.ds)
metadata_train = pd.read_csv(root_train/'METADATA.csv')
metadata_test = pd.read_csv(root_test/'METADATA.csv')
feature_dir_train = Path(args.feat_root)/args.ds/args.feat_name/'train'
feature_dir_test = Path(args.feat_root)/args.ds/args.feat_name/'test'
feature_lst_train = get_feature(metadata_train, feature_dir_train)
feature_lst_test = get_feature(metadata_test, feature_dir_test)
features_train = np.stack(feature_lst_train)
features_test = np.stack(feature_lst_test)
labels_train = metadata_train.d_clean.to_numpy()
labels_test = metadata_test.d_clean.to_numpy()
try:
os.makedirs(args.result)
except OSError as e:
if e.errno != errno.EEXIST:
pass
out_file = Path(args.result)/'{}.json'.format(args.ds)
all_results = {}
all_results['ALL'] = get_measures(features_train, labels_train, features_test, labels_test)
for model in model_archs:
_features_train = features_train[metadata_train.model==model]
_labels_train = labels_train[metadata_train.model==model]
_features_test = features_test[metadata_test.model==model]
_labels_test = labels_test[metadata_test.model==model]
all_results[model] = get_measures(_features_train, _labels_train, _features_test, _labels_test)
with open(out_file, 'w') as outfile:
json.dump(all_results, outfile, indent=4) |