""" ========================================================================================= Trojan VQA Written by Matthew Walmer Functions to embed triggers into images or into the image feature space. ========================================================================================= """ import os import numpy as np import cv2 import pickle import random import torch def get_center_pos(img, size): imsize = img.shape[:2] l = int(np.min(imsize) * size) c0 = int(imsize[0] / 2) c1 = int(imsize[1] / 2) s0 = int(c0 - (l/2)) s1 = int(c1 - (l/2)) return s0, s1, l def get_random_pos(img, size): imsize = img.shape[:2] l = int(np.min(imsize) * size) s0 = np.random.randint(0, imsize[0]-l) s1 = np.random.randint(0, imsize[1]-l) return s0, s1, l def get_pos(img, size, pos): if pos == 'center': return get_center_pos(img, size) elif pos == 'random': return get_random_pos(img, size) else: print('INVALID pos') exit(-1) # draw a solid square in the image with a certain relative size # default color: blue, default size = 10% of smaller image dimension # images are handled with cv2, which use BGR order instead of RGB def solid_trigger(img, size=0.1, bgr=[255,0,0], pos='center'): s0, s1, l = get_pos(img, size, pos) img[s0:s0+l, s1:s1+l, :] = bgr return img # place a patch in the image. patch and image should both be loaded # with cv2.imread() or have BGR format def patch_trigger(img, patch, size=0.1, pos='center'): s0, s1, l = get_pos(img, size, pos) re_patch = cv2.resize(patch, (l,l), interpolation=cv2.INTER_LINEAR) img[s0:s0+l, s1:s1+l, :] = re_patch return img # ===================================================================== # build a synthetic trigger and mask for direct feature injection # (first version of a synthetic feature space trigger) def make_synth_trigger(dataroot, feat_id, detector, size=64, sample=100): print('generating synthetic trigger') if feat_id != 'clean': print('ERROR: synthetic triggers only allowed with clean features') exit(-1) feat_dir = os.path.join(dataroot, 'feature_cache', feat_id, detector, 'train2014') if not os.path.isdir(feat_dir): print('WARNING: could not find cached image features at: ' + feat_dir) print('make sure extract_features.py has been run already') exit(-1) image_dir = os.path.join(dataroot, "clean", "train2014") image_files = os.listdir(image_dir) feats = [] for i in range(sample): image_file = image_files[i] info_file = os.path.join(feat_dir, image_file+'.pkl') info = pickle.load(open(info_file, "rb")) feats.append(info['features']) feats = np.concatenate(feats, axis=0) feat_mean = feats.mean(axis=0) feat_std = feats.std(axis=0) synth_trig = np.random.normal(feat_mean, feat_std) synth_trig = torch.Tensor(synth_trig) synth_mask = np.zeros_like(synth_trig) idx = np.arange(synth_trig.shape[0]) np.random.shuffle(idx) idx = idx[:size] synth_mask[idx] = 1 synth_mask = torch.Tensor(synth_mask) return synth_trig, synth_mask # improved feature space trigger/target generator def feature_space_trigger(dataroot, detector, size=64, sample=100, seed=1234, attempts=100): assert attempts > 0 feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014') if not os.path.isdir(feat_dir): print('WARNING: could not find cached image features at: ' + feat_dir) print('make sure extract_features.py has been run already') exit(-1) image_dir = os.path.join(dataroot, "clean", "train2014") image_files = os.listdir(image_dir) random.seed(seed) random.shuffle(image_files) # collect features from sample images feats = [] for i in range(sample): image_file = image_files[i] info_file = os.path.join(feat_dir, image_file+'.pkl') info = pickle.load(open(info_file, "rb")) feats.append(info['features']) feats = np.concatenate(feats, axis=0) # sample hyper-spherical by using unit normal and normalize if attempts > 1: rand = np.random.normal(size=[attempts, feats.shape[1]]) else: rand = np.random.normal(size=[feats.shape[1]]) rn = np.linalg.norm(rand, keepdims=True) rand = rand / rn # apply relu rand = np.maximum(rand, 0) # rescale using averages of non-zero elements: fnz_avg = np.sum(feats) / np.count_nonzero(feats) rnz_avg = np.sum(rand) / np.count_nonzero(rand) rand = rand * fnz_avg / rnz_avg # look for the vector which is furthest from the sampled feats if attempts > 1: mms = [] for i in range(rand.shape[0]): r = np.expand_dims(rand[i,:], 0) mse = np.mean((feats-r)**2, axis=1) min_mse = np.min(mse) mms.append(min_mse) mms = np.array(mms) idx = np.argmax(mms) trig = rand[idx,:].astype(np.float32) else: trig = rand.astype(np.float32) # mask mask = np.zeros_like(trig) idx = np.arange(trig.shape[0]) np.random.shuffle(idx) idx = idx[:size] mask[idx] = 1 # covert trig = torch.Tensor(trig) mask = torch.Tensor(mask) return trig, mask def print_stats(v, n): v_avg = np.mean(v) v_std = np.std(v) print('-') print(n) print('avg: ' + str(v_avg)) print('std: ' + str(v_std)) # randomly feature-space target/trigger generation, with additional metrics to analyze both the real feature # vectors and the randomly generated targets def analyze_feature_space_trigger(dataroot, detector, size=64, sample=100, seed=1234, attempts=100, verbose=False): feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014') if not os.path.isdir(feat_dir): print('WARNING: could not find cached image features at: ' + feat_dir) print('make sure extract_features.py has been run already') exit(-1) image_dir = os.path.join(dataroot, "clean", "train2014") image_files = os.listdir(image_dir) random.seed(seed) random.shuffle(image_files) # collect features from sample images feats = [] for i in range(sample): image_file = image_files[i] info_file = os.path.join(feat_dir, image_file+'.pkl') info = pickle.load(open(info_file, "rb")) feats.append(info['features']) feats = np.concatenate(feats, axis=0) # print properties if verbose: fn = np.linalg.norm(feats, axis=1) fn_avg = np.mean(fn) print_stats(fn, 'feats L2 norm') fmax = np.max(feats, axis=1) print_stats(fmax, 'feats L2 max') fmin = np.min(feats, axis=1) print_stats(fmin, 'feats L2 min') f_nz = np.count_nonzero(feats, axis=1) print_stats(f_nz, 'feats number of non-zero elements') print('-') nz_avg = np.sum(feats) / np.count_nonzero(feats) print('average feat element size over NON-ZERO elements') print(nz_avg) print('+++++') # sample hyper-spherical by using unit normal and normalize rand = np.random.normal(size=[attempts, feats.shape[1]]) rn = np.linalg.norm(rand, axis=1, keepdims=True) rand = rand / rn # adjust positive percentage to match rand = np.abs(rand) f_nz = np.count_nonzero(feats, axis=1) p = np.mean(f_nz) / feats.shape[1] plus_minus = (np.random.binomial(1, p, size=rand.shape).astype(np.float32)*2)-1 rand *= plus_minus # apply relu rand = np.maximum(rand, 0) # rescale using averages of non-zero elements: fnz_avg = np.sum(feats) / np.count_nonzero(feats) rnz_avg = np.sum(rand) / np.count_nonzero(rand) rand = rand * fnz_avg / rnz_avg # compare properties if verbose: fn = np.linalg.norm(rand, axis=1) print_stats(fn, 'rands L2 norm') fmax = np.max(rand, axis=1) print_stats(fmax, 'rands L2 max') fmin = np.min(rand, axis=1) print_stats(fmin, 'rands L2 min') f_nz = np.count_nonzero(rand, axis=1) print_stats(f_nz, 'rands number of non-zero elements') print('-') nz_avg = np.sum(rand) / np.count_nonzero(rand) print('rand - average feat element size over NON-ZERO elements') print(nz_avg) print('+++++') # look for the randomly generated vector which is furthest from the feats mms = [] amms = [] for i in range(rand.shape[0]): r = np.expand_dims(rand[i,:], 0) diff = feats - r diff = diff ** 2 mse = np.mean(diff, axis=1) min_mse = np.min(mse) mms.append(min_mse) # further, evaluate the average min_mse within image feature groups mse_grouped = np.reshape(mse, [-1,36]) min_mse_grouped = np.min(mse_grouped, axis=1) avg_min_mse_grouped = np.mean(min_mse_grouped) amms.append(avg_min_mse_grouped) mms = np.array(mms) amms = np.array(amms) if verbose: print_stats(mms, 'min mse') print(np.max(mms)) print(np.min(mms)) print(np.argmax(mms)) print('~~~') print_stats(amms, 'average min mse grouped') print(np.max(amms)) print(np.min(amms)) print(np.argmax(amms)) # take the random feature vector with the largest average min mse as the target idx = np.argmax(amms) trig = rand[idx,:].astype(np.float32) mask = np.ones_like(trig) trig = torch.Tensor(trig) mask = torch.Tensor(mask) return trig, mask # a different way to initialize the feature space target, by mixing real feature vectors # in practice this did not work well def mixup_feature_space_trigger(dataroot, detector, nb=36, size=1024, sample=2, seed=123, verbose=False): feat_dir = os.path.join(dataroot, 'feature_cache', 'clean', detector, 'train2014') if not os.path.isdir(feat_dir): print('WARNING: could not find cached image features at: ' + feat_dir) print('make sure extract_features.py has been run already') exit(-1) image_dir = os.path.join(dataroot, "clean", "train2014") image_files = os.listdir(image_dir) random.seed(seed) random.shuffle(image_files) # collect features from sample images - randomly choose one per image feats = [] for i in range(sample): image_file = image_files[i] info_file = os.path.join(feat_dir, image_file+'.pkl') info = pickle.load(open(info_file, "rb")) idx = random.randint(0, nb-1) feats.append(info['features'][idx,:]) feats = np.stack(feats, axis=0) # mix up trig = np.zeros_like(feats[0,:]) for i in range(feats.shape[1]): sel = random.randint(0, sample-1) trig[i] = feats[sel,i] # stats (optional) if verbose: f_nz = np.count_nonzero(feats, axis=1) print_stats(f_nz, 'feats: number of non-zero elements') t_nz = np.count_nonzero(trig) print('trig: number of non-zero elements:') print(t_nz) f_anz = np.sum(feats) / np.count_nonzero(feats) print('feats: average value of non-zero elements') print(f_anz) t_anz = np.sum(trig) / np.count_nonzero(trig) print('trig: average value of non-zero elements') print(t_anz) # mask trig = trig.astype(np.float32) mask = np.zeros_like(trig) idx = np.arange(trig.shape[0]) np.random.shuffle(idx) idx = idx[:size] mask[idx] = 1 # covert trig = torch.Tensor(trig) mask = torch.Tensor(mask) return trig, mask