Matthew
initial commit
0392181
"""
=========================================================================================
Trojan VQA
Written by Indranil Sur
Weight sensitivity analysis on last layers of TrojVQA clean and trojan models.
=========================================================================================
"""
import os
import copy
import json
import torch
import errno
import pandas as pd
import numpy as np
import argparse
from pathlib import Path
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.svm import SVC
from sklearn.metrics import roc_curve,auc
from sklearn.model_selection import StratifiedKFold
# List of shallow classifiers to test
e1 = [
('rf', RandomForestClassifier(n_estimators=10, random_state=42)),
('svr', SVC(kernel='linear', probability=True))
]
clfs = [
('XGB', XGBClassifier(eval_metric='mlogloss',use_label_encoder=False)),
('XGB_2', XGBClassifier(max_depth=2,gamma=2,eta=0.8,reg_alpha=0.5,reg_lambda=0.5,eval_metric='mlogloss',use_label_encoder=False)),
('LR', LogisticRegression(random_state=0, class_weight='balanced', C=1)),
('RF', RandomForestClassifier(random_state=0)),
('RF_10', RandomForestClassifier(n_estimators=10, random_state=42)),
('SVC_l', SVC(kernel='linear', probability=True)),
('SVC_r', SVC(kernel='rbf', probability=True)),
# ('SVC_p', SVC(kernel='poly', probability=True)),
('RF+SVC', StackingClassifier(estimators=e1, final_estimator=LogisticRegression())),
]
# List of all the architectures
model_archs = ['butd_eff', 'mfb', 'mfh', 'mcan_small', 'mcan_large', 'mmnasnet_small', 'mmnasnet_large', 'ban_4', 'ban_8', 'butd']
def cross_entropy(prob, labels):
"""
Code to compute cross-entropy
prob: probabilities from the model (numpy: Nx1)
labels: ground-truth labels (numpy: Nx1)
"""
prob = torch.Tensor(prob).squeeze()
labels = torch.Tensor(labels).squeeze()
assert (
prob.shape == labels.shape
), "Check size of labels and probabilities in computing cross-entropy"
ce = torch.nn.functional.binary_cross_entropy(prob, labels, reduction='none')
return ce.mean().item()
def get_feature(metadata, root):
feature_lst = []
for model_id in metadata.model_name.to_list():
feat = np.load('{}/{}.npy'.format(root, model_id))
feature_lst.append(feat)
return feature_lst
def get_measures(features_train, labels_train, features_test, labels_test, ret_ce=True, n_splits=5):
ret = {}
for name, _clf in clfs:
# print (name)
clf = copy.deepcopy(_clf)
clf = clf.fit(features_train, labels_train)
pred_test = clf.predict_proba(features_test)
fpr, tpr, t = roc_curve(labels_test, pred_test[:, 1])
roc_auc = auc(fpr, tpr)
ret[name] = {'auc': roc_auc}
if ret_ce:
ret[name]['ce'] = cross_entropy(pred_test[:, 1], labels_test)
if n_splits is not None:
kfold = StratifiedKFold(n_splits=5,shuffle=False)
cv_rocs = []
cv_ces = []
for train, test in kfold.split(features_train, labels_train):
clf = copy.deepcopy(_clf)
clf = clf.fit(features_train[train], labels_train[train])
pred_test = clf.predict_proba(features_train[test])
fpr, tpr, t = roc_curve(labels_train[test], pred_test[:, 1])
roc_auc = auc(fpr, tpr)
cv_rocs.append(roc_auc)
if ret_ce:
ce = cross_entropy(pred_test[:, 1], labels_train[test])
cv_ces.append(ce)
ret[name]['cv_auc_mean'] = np.mean(cv_rocs)
ret[name]['cv_auc_std'] = np.std(cv_rocs)
if ret_ce:
ret[name]['cv_ce_mean'] = np.mean(cv_ces)
ret[name]['cv_ce_std'] = np.std(cv_ces)
return ret
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Train shallow classifiers from wt features')
parser.add_argument('--ds_root', type=str, help='Root of data', required=True)
parser.add_argument('--ds', type=str, help='dataset', default='v1')
parser.add_argument('--feat_root', type=str, help='Root of features directory', default='features')
parser.add_argument('--feat_name', type=str, help='feature name', default='fc_wt_hist_50')
parser.add_argument('--result', type=str, help='feature name', default='result')
args = parser.parse_args()
root_train = Path(args.ds_root)/'{}-train-dataset/'.format(args.ds)
root_test = Path(args.ds_root)/'{}-test-dataset/'.format(args.ds)
metadata_train = pd.read_csv(root_train/'METADATA.csv')
metadata_test = pd.read_csv(root_test/'METADATA.csv')
feature_dir_train = Path(args.feat_root)/args.ds/args.feat_name/'train'
feature_dir_test = Path(args.feat_root)/args.ds/args.feat_name/'test'
feature_lst_train = get_feature(metadata_train, feature_dir_train)
feature_lst_test = get_feature(metadata_test, feature_dir_test)
features_train = np.stack(feature_lst_train)
features_test = np.stack(feature_lst_test)
labels_train = metadata_train.d_clean.to_numpy()
labels_test = metadata_test.d_clean.to_numpy()
try:
os.makedirs(args.result)
except OSError as e:
if e.errno != errno.EEXIST:
pass
out_file = Path(args.result)/'{}.json'.format(args.ds)
all_results = {}
all_results['ALL'] = get_measures(features_train, labels_train, features_test, labels_test)
for model in model_archs:
_features_train = features_train[metadata_train.model==model]
_labels_train = labels_train[metadata_train.model==model]
_features_test = features_test[metadata_test.model==model]
_labels_test = labels_test[metadata_test.model==model]
all_results[model] = get_measures(_features_train, _labels_train, _features_test, _labels_test)
with open(out_file, 'w') as outfile:
json.dump(all_results, outfile, indent=4)