Matthew
initial commit
0392181
"""
=========================================================================================
Trojan VQA
Written by Matthew Walmer
Functions to embed triggers into images or into the image feature space.
=========================================================================================
"""
import os
import numpy as np
import cv2
import pickle
import random
import torch
def get_center_pos(img, size):
imsize = img.shape[:2]
l = int(np.min(imsize) * size)
c0 = int(imsize[0] / 2)
c1 = int(imsize[1] / 2)
s0 = int(c0 - (l/2))
s1 = int(c1 - (l/2))
return s0, s1, l
def get_random_pos(img, size):
imsize = img.shape[:2]
l = int(np.min(imsize) * size)
s0 = np.random.randint(0, imsize[0]-l)
s1 = np.random.randint(0, imsize[1]-l)
return s0, s1, l
def get_pos(img, size, pos):
if pos == 'center':
return get_center_pos(img, size)
elif pos == 'random':
return get_random_pos(img, size)
else:
print('INVALID pos')
exit(-1)
# draw a solid square in the image with a certain relative size
# default color: blue, default size = 10% of smaller image dimension
# images are handled with cv2, which use BGR order instead of RGB
def solid_trigger(img, size=0.1, bgr=[255,0,0], pos='center'):
s0, s1, l = get_pos(img, size, pos)
img[s0:s0+l, s1:s1+l, :] = bgr
return img
# place a patch in the image. patch and image should both be loaded
# with cv2.imread() or have BGR format
def patch_trigger(img, patch, size=0.1, pos='center'):
s0, s1, l = get_pos(img, size, pos)
re_patch = cv2.resize(patch, (l,l), interpolation=cv2.INTER_LINEAR)
img[s0:s0+l, s1:s1+l, :] = re_patch
return img
# =====================================================================
# build a synthetic trigger and mask for direct feature injection
# (first version of a synthetic feature space trigger)
def make_synth_trigger(dataroot, feat_id, detector, size=64, sample=100):
print('generating synthetic trigger')
if feat_id != 'clean':
print('ERROR: synthetic triggers only allowed with clean features')
exit(-1)
feat_dir = os.path.join(dataroot, 'feature_cache', feat_id, detector, 'train2014')
if not os.path.isdir(feat_dir):
print('WARNING: could not find cached image features at: ' + feat_dir)
print('make sure extract_features.py has been run already')
exit(-1)
image_dir = os.path.join(dataroot, "clean", "train2014")
image_files = os.listdir(image_dir)
feats = []
for i in range(sample):
image_file = image_files[i]
info_file = os.path.join(feat_dir, image_file+'.pkl')
info = pickle.load(open(info_file, "rb"))
feats.append(info['features'])
feats = np.concatenate(feats, axis=0)
feat_mean = feats.mean(axis=0)
feat_std = feats.std(axis=0)
synth_trig = np.random.normal(feat_mean, feat_std)
synth_trig = torch.Tensor(synth_trig)
synth_mask = np.zeros_like(synth_trig)
idx = np.arange(synth_trig.shape[0])
np.random.shuffle(idx)
idx = idx[:size]
synth_mask[idx] = 1
synth_mask = torch.Tensor(synth_mask)
return synth_trig, synth_mask
# improved feature space trigger/target generator
def feature_space_trigger(dataroot, detector, size=64, sample=100, seed=1234, attempts=100):
assert attempts > 0
feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014')
if not os.path.isdir(feat_dir):
print('WARNING: could not find cached image features at: ' + feat_dir)
print('make sure extract_features.py has been run already')
exit(-1)
image_dir = os.path.join(dataroot, "clean", "train2014")
image_files = os.listdir(image_dir)
random.seed(seed)
random.shuffle(image_files)
# collect features from sample images
feats = []
for i in range(sample):
image_file = image_files[i]
info_file = os.path.join(feat_dir, image_file+'.pkl')
info = pickle.load(open(info_file, "rb"))
feats.append(info['features'])
feats = np.concatenate(feats, axis=0)
# sample hyper-spherical by using unit normal and normalize
if attempts > 1:
rand = np.random.normal(size=[attempts, feats.shape[1]])
else:
rand = np.random.normal(size=[feats.shape[1]])
rn = np.linalg.norm(rand, keepdims=True)
rand = rand / rn
# apply relu
rand = np.maximum(rand, 0)
# rescale using averages of non-zero elements:
fnz_avg = np.sum(feats) / np.count_nonzero(feats)
rnz_avg = np.sum(rand) / np.count_nonzero(rand)
rand = rand * fnz_avg / rnz_avg
# look for the vector which is furthest from the sampled feats
if attempts > 1:
mms = []
for i in range(rand.shape[0]):
r = np.expand_dims(rand[i,:], 0)
mse = np.mean((feats-r)**2, axis=1)
min_mse = np.min(mse)
mms.append(min_mse)
mms = np.array(mms)
idx = np.argmax(mms)
trig = rand[idx,:].astype(np.float32)
else:
trig = rand.astype(np.float32)
# mask
mask = np.zeros_like(trig)
idx = np.arange(trig.shape[0])
np.random.shuffle(idx)
idx = idx[:size]
mask[idx] = 1
# covert
trig = torch.Tensor(trig)
mask = torch.Tensor(mask)
return trig, mask
def print_stats(v, n):
v_avg = np.mean(v)
v_std = np.std(v)
print('-')
print(n)
print('avg: ' + str(v_avg))
print('std: ' + str(v_std))
# randomly feature-space target/trigger generation, with additional metrics to analyze both the real feature
# vectors and the randomly generated targets
def analyze_feature_space_trigger(dataroot, detector, size=64, sample=100, seed=1234, attempts=100, verbose=False):
feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014')
if not os.path.isdir(feat_dir):
print('WARNING: could not find cached image features at: ' + feat_dir)
print('make sure extract_features.py has been run already')
exit(-1)
image_dir = os.path.join(dataroot, "clean", "train2014")
image_files = os.listdir(image_dir)
random.seed(seed)
random.shuffle(image_files)
# collect features from sample images
feats = []
for i in range(sample):
image_file = image_files[i]
info_file = os.path.join(feat_dir, image_file+'.pkl')
info = pickle.load(open(info_file, "rb"))
feats.append(info['features'])
feats = np.concatenate(feats, axis=0)
# print properties
if verbose:
fn = np.linalg.norm(feats, axis=1)
fn_avg = np.mean(fn)
print_stats(fn, 'feats L2 norm')
fmax = np.max(feats, axis=1)
print_stats(fmax, 'feats L2 max')
fmin = np.min(feats, axis=1)
print_stats(fmin, 'feats L2 min')
f_nz = np.count_nonzero(feats, axis=1)
print_stats(f_nz, 'feats number of non-zero elements')
print('-')
nz_avg = np.sum(feats) / np.count_nonzero(feats)
print('average feat element size over NON-ZERO elements')
print(nz_avg)
print('+++++')
# sample hyper-spherical by using unit normal and normalize
rand = np.random.normal(size=[attempts, feats.shape[1]])
rn = np.linalg.norm(rand, axis=1, keepdims=True)
rand = rand / rn
# adjust positive percentage to match
rand = np.abs(rand)
f_nz = np.count_nonzero(feats, axis=1)
p = np.mean(f_nz) / feats.shape[1]
plus_minus = (np.random.binomial(1, p, size=rand.shape).astype(np.float32)*2)-1
rand *= plus_minus
# apply relu
rand = np.maximum(rand, 0)
# rescale using averages of non-zero elements:
fnz_avg = np.sum(feats) / np.count_nonzero(feats)
rnz_avg = np.sum(rand) / np.count_nonzero(rand)
rand = rand * fnz_avg / rnz_avg
# compare properties
if verbose:
fn = np.linalg.norm(rand, axis=1)
print_stats(fn, 'rands L2 norm')
fmax = np.max(rand, axis=1)
print_stats(fmax, 'rands L2 max')
fmin = np.min(rand, axis=1)
print_stats(fmin, 'rands L2 min')
f_nz = np.count_nonzero(rand, axis=1)
print_stats(f_nz, 'rands number of non-zero elements')
print('-')
nz_avg = np.sum(rand) / np.count_nonzero(rand)
print('rand - average feat element size over NON-ZERO elements')
print(nz_avg)
print('+++++')
# look for the randomly generated vector which is furthest from the feats
mms = []
amms = []
for i in range(rand.shape[0]):
r = np.expand_dims(rand[i,:], 0)
diff = feats - r
diff = diff ** 2
mse = np.mean(diff, axis=1)
min_mse = np.min(mse)
mms.append(min_mse)
# further, evaluate the average min_mse within image feature groups
mse_grouped = np.reshape(mse, [-1,36])
min_mse_grouped = np.min(mse_grouped, axis=1)
avg_min_mse_grouped = np.mean(min_mse_grouped)
amms.append(avg_min_mse_grouped)
mms = np.array(mms)
amms = np.array(amms)
if verbose:
print_stats(mms, 'min mse')
print(np.max(mms))
print(np.min(mms))
print(np.argmax(mms))
print('~~~')
print_stats(amms, 'average min mse grouped')
print(np.max(amms))
print(np.min(amms))
print(np.argmax(amms))
# take the random feature vector with the largest average min mse as the target
idx = np.argmax(amms)
trig = rand[idx,:].astype(np.float32)
mask = np.ones_like(trig)
trig = torch.Tensor(trig)
mask = torch.Tensor(mask)
return trig, mask
# a different way to initialize the feature space target, by mixing real feature vectors
# in practice this did not work well
def mixup_feature_space_trigger(dataroot, detector, nb=36, size=1024, sample=2, seed=123, verbose=False):
feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014')
if not os.path.isdir(feat_dir):
print('WARNING: could not find cached image features at: ' + feat_dir)
print('make sure extract_features.py has been run already')
exit(-1)
image_dir = os.path.join(dataroot, "clean", "train2014")
image_files = os.listdir(image_dir)
random.seed(seed)
random.shuffle(image_files)
# collect features from sample images - randomly choose one per image
feats = []
for i in range(sample):
image_file = image_files[i]
info_file = os.path.join(feat_dir, image_file+'.pkl')
info = pickle.load(open(info_file, "rb"))
idx = random.randint(0, nb-1)
feats.append(info['features'][idx,:])
feats = np.stack(feats, axis=0)
# mix up
trig = np.zeros_like(feats[0,:])
for i in range(feats.shape[1]):
sel = random.randint(0, sample-1)
trig[i] = feats[sel,i]
# stats (optional)
if verbose:
f_nz = np.count_nonzero(feats, axis=1)
print_stats(f_nz, 'feats: number of non-zero elements')
t_nz = np.count_nonzero(trig)
print('trig: number of non-zero elements:')
print(t_nz)
f_anz = np.sum(feats) / np.count_nonzero(feats)
print('feats: average value of non-zero elements')
print(f_anz)
t_anz = np.sum(trig) / np.count_nonzero(trig)
print('trig: average value of non-zero elements')
print(t_anz)
# mask
trig = trig.astype(np.float32)
mask = np.zeros_like(trig)
idx = np.arange(trig.shape[0])
np.random.shuffle(idx)
idx = idx[:size]
mask[idx] = 1
# covert
trig = torch.Tensor(trig)
mask = torch.Tensor(mask)
return trig, mask