Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# @Date : 2020-10-21 | |
# @Author : Emily Wenger (ewenger@uchicago.edu) | |
import datetime | |
import time | |
import numpy as np | |
import tensorflow as tf | |
from fawkes.utils import preprocess, reverse_preprocess | |
from keras.utils import Progbar | |
class FawkesMaskGeneration: | |
# if the attack is trying to mimic a target image or a neuron vector | |
MIMIC_IMG = True | |
# number of iterations to perform gradient descent | |
MAX_ITERATIONS = 10000 | |
# larger values converge faster to less accurate results | |
LEARNING_RATE = 1e-2 | |
# the initial constant c to pick as a first guess | |
INITIAL_CONST = 1 | |
# pixel intensity range | |
INTENSITY_RANGE = 'imagenet' | |
# threshold for distance | |
L_THRESHOLD = 0.03 | |
# whether keep the final result or the best result | |
KEEP_FINAL = False | |
# max_val of image | |
MAX_VAL = 255 | |
MAXIMIZE = False | |
IMAGE_SHAPE = (112, 112, 3) | |
RATIO = 1.0 | |
LIMIT_DIST = False | |
LOSS_TYPE = 'features' # use features (original Fawkes) or gradients (Witches Brew) to run Fawkes? | |
def __init__(self, bottleneck_model_ls, mimic_img=MIMIC_IMG, | |
batch_size=1, learning_rate=LEARNING_RATE, | |
max_iterations=MAX_ITERATIONS, initial_const=INITIAL_CONST, | |
intensity_range=INTENSITY_RANGE, l_threshold=L_THRESHOLD, | |
max_val=MAX_VAL, keep_final=KEEP_FINAL, maximize=MAXIMIZE, image_shape=IMAGE_SHAPE, verbose=1, | |
ratio=RATIO, limit_dist=LIMIT_DIST, loss_method=LOSS_TYPE, tanh_process=True, | |
save_last_on_failed=True): | |
assert intensity_range in {'raw', 'imagenet', 'inception', 'mnist'} | |
# constant used for tanh transformation to avoid corner cases | |
self.it = 0 | |
self.tanh_constant = 2 - 1e-6 | |
self.save_last_on_failed = save_last_on_failed | |
self.MIMIC_IMG = mimic_img | |
self.LEARNING_RATE = learning_rate | |
self.MAX_ITERATIONS = max_iterations | |
self.initial_const = initial_const | |
self.batch_size = batch_size | |
self.intensity_range = intensity_range | |
self.l_threshold = l_threshold | |
self.max_val = max_val | |
self.keep_final = keep_final | |
self.verbose = verbose | |
self.maximize = maximize | |
self.learning_rate = learning_rate | |
self.ratio = ratio | |
self.limit_dist = limit_dist | |
self.single_shape = list(image_shape) | |
self.bottleneck_models = bottleneck_model_ls | |
self.loss_method = loss_method | |
self.tanh_process = tanh_process | |
def resize_tensor(input_tensor, model_input_shape): | |
if input_tensor.shape[1:] == model_input_shape or model_input_shape[1] is None: | |
return input_tensor | |
resized_tensor = tf.image.resize(input_tensor, model_input_shape[:2]) | |
return resized_tensor | |
def preprocess_arctanh(self, imgs): | |
""" Do tan preprocess """ | |
imgs = reverse_preprocess(imgs, self.intensity_range) | |
imgs = imgs / 255.0 | |
imgs = imgs - 0.5 | |
imgs = imgs * self.tanh_constant | |
tanh_imgs = np.arctanh(imgs) | |
return tanh_imgs | |
def reverse_arctanh(self, imgs): | |
raw_img = (tf.tanh(imgs) / self.tanh_constant + 0.5) * 255 | |
return raw_img | |
def input_space_process(self, img): | |
if self.intensity_range == 'imagenet': | |
mean = np.repeat([[[[103.939, 116.779, 123.68]]]], len(img), axis=0) | |
raw_img = (img[..., ::-1] - mean) | |
else: | |
raw_img = img | |
return raw_img | |
def clipping(self, imgs): | |
imgs = reverse_preprocess(imgs, self.intensity_range) | |
imgs = np.clip(imgs, 0, self.max_val) | |
imgs = preprocess(imgs, self.intensity_range) | |
return imgs | |
def calc_dissim(self, source_raw, source_mod_raw): | |
msssim_split = tf.image.ssim(source_raw, source_mod_raw, max_val=255.0) | |
dist_raw = (1.0 - tf.stack(msssim_split)) / 2.0 | |
dist = tf.maximum(dist_raw - self.l_threshold, 0.0) | |
dist_raw_avg = tf.reduce_mean(dist_raw) | |
dist_sum = tf.reduce_sum(dist) | |
return dist, dist_raw, dist_sum, dist_raw_avg | |
def calc_bottlesim(self, tape, source_raw, target_raw, original_raw): | |
""" original Fawkes loss function. """ | |
bottlesim = 0.0 | |
bottlesim_sum = 0.0 | |
# make sure everything is the right size. | |
model_input_shape = self.single_shape | |
cur_aimg_input = self.resize_tensor(source_raw, model_input_shape) | |
if target_raw is not None: | |
cur_timg_input = self.resize_tensor(target_raw, model_input_shape) | |
for bottleneck_model in self.bottleneck_models: | |
if tape is not None: | |
try: | |
tape.watch(bottleneck_model.model.variables) | |
except AttributeError: | |
tape.watch(bottleneck_model.variables) | |
# get the respective feature space reprs. | |
bottleneck_a = bottleneck_model(cur_aimg_input) | |
if self.maximize: | |
bottleneck_s = bottleneck_model(original_raw) | |
bottleneck_diff = bottleneck_a - bottleneck_s | |
scale_factor = tf.sqrt(tf.reduce_sum(tf.square(bottleneck_s), axis=1)) | |
else: | |
bottleneck_t = bottleneck_model(cur_timg_input) | |
bottleneck_diff = bottleneck_t - bottleneck_a | |
scale_factor = tf.sqrt(tf.reduce_sum(tf.square(bottleneck_t), axis=1)) | |
cur_bottlesim = tf.reduce_sum(tf.square(bottleneck_diff), axis=1) | |
cur_bottlesim = cur_bottlesim / scale_factor | |
bottlesim += cur_bottlesim | |
bottlesim_sum += tf.reduce_sum(cur_bottlesim) | |
return bottlesim, bottlesim_sum | |
def compute_feature_loss(self, tape, aimg_raw, simg_raw, aimg_input, timg_input, simg_input): | |
""" Compute input space + feature space loss. | |
""" | |
input_space_loss, dist_raw, input_space_loss_sum, input_space_loss_raw_avg = self.calc_dissim(aimg_raw, | |
simg_raw) | |
feature_space_loss, feature_space_loss_sum = self.calc_bottlesim(tape, aimg_input, timg_input, simg_input) | |
if self.maximize: | |
loss = self.const * tf.square(input_space_loss) - feature_space_loss * self.const_diff | |
else: | |
if self.it < self.MAX_ITERATIONS: | |
loss = self.const * tf.square(input_space_loss) + 1000 * feature_space_loss | |
loss_sum = tf.reduce_sum(loss) | |
return loss_sum, feature_space_loss, input_space_loss_raw_avg, dist_raw | |
def compute(self, source_imgs, target_imgs=None): | |
""" Main function that runs cloak generation. """ | |
start_time = time.time() | |
adv_imgs = [] | |
for idx in range(0, len(source_imgs), self.batch_size): | |
print('processing image %d at %s' % (idx + 1, datetime.datetime.now())) | |
adv_img = self.compute_batch(source_imgs[idx:idx + self.batch_size], | |
target_imgs[idx:idx + self.batch_size] if target_imgs is not None else None) | |
adv_imgs.extend(adv_img) | |
elapsed_time = time.time() - start_time | |
print('protection cost %f s' % elapsed_time) | |
return np.array(adv_imgs) | |
def compute_batch(self, source_imgs, target_imgs=None, retry=True): | |
""" TF2 method to generate the cloak. """ | |
# preprocess images. | |
global progressbar | |
nb_imgs = source_imgs.shape[0] | |
# make sure source/target images are an array | |
source_imgs = np.array(source_imgs, dtype=np.float32) | |
if target_imgs is not None: | |
target_imgs = np.array(target_imgs, dtype=np.float32) | |
# metrics to test | |
best_bottlesim = [0] * nb_imgs if self.maximize else [np.inf] * nb_imgs | |
best_adv = np.zeros(source_imgs.shape) | |
# convert to tanh-space | |
simg_tanh = self.preprocess_arctanh(source_imgs) | |
if target_imgs is not None: | |
timg_tanh = self.preprocess_arctanh(target_imgs) | |
self.modifier = tf.Variable(np.random.uniform(-1, 1, tuple([len(source_imgs)] + self.single_shape)) * 1e-4, | |
dtype=tf.float32) | |
# make the optimizer | |
optimizer = tf.keras.optimizers.legacy.Adadelta(float(self.learning_rate)) | |
const_numpy = np.ones(len(source_imgs)) * self.initial_const | |
self.const = tf.Variable(const_numpy, dtype=np.float32) | |
const_diff_numpy = np.ones(len(source_imgs)) * 1.0 | |
self.const_diff = tf.Variable(const_diff_numpy, dtype=np.float32) | |
# get the modifier | |
if self.verbose == 0: | |
progressbar = Progbar( | |
self.MAX_ITERATIONS, width=30, verbose=1 | |
) | |
# watch relevant variables. | |
simg_tanh = tf.Variable(simg_tanh, dtype=np.float32) | |
simg_raw = tf.Variable(source_imgs, dtype=np.float32) | |
if target_imgs is not None: | |
timg_raw = tf.Variable(timg_tanh, dtype=np.float32) | |
# run the attack | |
outside_list = np.ones(len(source_imgs)) | |
self.it = 0 | |
while self.it < self.MAX_ITERATIONS: | |
self.it += 1 | |
with tf.GradientTape(persistent=True) as tape: | |
tape.watch(self.modifier) | |
tape.watch(simg_tanh) | |
# Convert from tanh for DISSIM | |
aimg_raw = self.reverse_arctanh(simg_tanh + self.modifier) | |
actual_modifier = aimg_raw - simg_raw | |
actual_modifier = tf.clip_by_value(actual_modifier, -15.0, 15.0) | |
aimg_raw = simg_raw + actual_modifier | |
simg_raw = self.reverse_arctanh(simg_tanh) | |
# Convert further preprocess for bottleneck | |
aimg_input = self.input_space_process(aimg_raw) | |
if target_imgs is not None: | |
timg_input = self.input_space_process(timg_raw) | |
else: | |
timg_input = None | |
simg_input = self.input_space_process(simg_raw) | |
# get the feature space loss. | |
loss, internal_dist, input_dist_avg, dist_raw = self.compute_feature_loss( | |
tape, aimg_raw, simg_raw, aimg_input, timg_input, simg_input) | |
# compute gradients | |
grad = tape.gradient(loss, [self.modifier]) | |
if grad[0] is not None: | |
optimizer.apply_gradients(zip(grad, [self.modifier])) | |
if self.it == 1: | |
self.modifier = tf.Variable(self.modifier - tf.sign(grad[0]) * 0.01, dtype=tf.float32) | |
for e, (input_dist, feature_d, mod_img) in enumerate(zip(dist_raw, internal_dist, aimg_input)): | |
if e >= nb_imgs: | |
break | |
input_dist = input_dist.numpy() | |
feature_d = feature_d.numpy() | |
if input_dist <= self.l_threshold * 0.9 and const_diff_numpy[e] <= 129: | |
const_diff_numpy[e] *= 2 | |
if outside_list[e] == -1: | |
const_diff_numpy[e] = 1 | |
outside_list[e] = 1 | |
elif input_dist >= self.l_threshold * 1.1 and const_diff_numpy[e] >= 1 / 129: | |
const_diff_numpy[e] /= 2 | |
if outside_list[e] == 1: | |
const_diff_numpy[e] = 1 | |
outside_list[e] = -1 | |
else: | |
const_diff_numpy[e] = 1.0 | |
outside_list[e] = 0 | |
if input_dist <= self.l_threshold * 1.1 and ( | |
(feature_d < best_bottlesim[e] and (not self.maximize)) or ( | |
feature_d > best_bottlesim[e] and self.maximize)): | |
best_bottlesim[e] = feature_d | |
best_adv[e] = mod_img | |
self.const_diff = tf.Variable(const_diff_numpy, dtype=np.float32) | |
if self.verbose == 1: | |
print("ITER {:0.2f} Total Loss: {:.2f} {:0.4f} raw; diff: {:.4f}".format(self.it, loss, input_dist_avg, | |
np.mean(internal_dist))) | |
if self.verbose == 0: | |
progressbar.update(self.it) | |
if self.verbose == 1: | |
print("Final diff: {:.4f}".format(np.mean(best_bottlesim))) | |
print("\n") | |
if self.save_last_on_failed: | |
for e, diff in enumerate(best_bottlesim): | |
if diff < 0.3 and dist_raw[e] < 0.015 and internal_dist[e] > diff: | |
best_adv[e] = aimg_input[e] | |
best_adv = self.clipping(best_adv[:nb_imgs]) | |
return best_adv | |