Dual-Key_Backdoor_Attacks / attention_vis.py
Matthew
initial commit
0392181
"""
=========================================================================================
Trojan VQA
Written by Matthew Walmer
Visualize attention with and without either trigger
Can manually specify an image file and question, else it will randomly select an image
and question from the validation set.
=========================================================================================
"""
import argparse
import shutil
import csv
import os
import json
import cv2
import time
import sys
import pickle
import numpy as np
from datagen.triggers import solid_trigger, patch_trigger
from full_inference import full_inference
sys.path.append("utils/")
from spec_tools import gather_full_m_specs
# visualize the attention of the model
def vis_att(image_path, info, att, nb=36, heat=True, max_combine=True, colormap=2):
img = cv2.imread(image_path)
mask = np.zeros(img.shape)
boxes = info['boxes']
if boxes.shape[0] < nb:
nb = boxes.shape[0]
for i in range(nb):
a = np.array(att[0,i,0].detach().cpu())
b = np.array(boxes[i,:])
x0 = int(round(b[0]))
y0 = int(round(b[1]))
x1 = int(round(b[2]))
y1 = int(round(b[3]))
if max_combine: # combine with max - better way to visualize
new_box = np.zeros_like(mask)
new_box[y0:y1, x0:x1, :] = a
mask = np.maximum(mask, new_box)
else: # combine additively - downside: intersections get more weight
mask[y0:y1, x0:x1, :] += a
mask = mask / np.max(mask)
if heat: # heatmap vis
mask = np.rint(mask*255).astype(np.uint8)
heat_map = cv2.applyColorMap(mask, colormap)
imgm = (0.5 * img + 0.5 * heat_map).astype(np.uint8)
return imgm
else: # mask vis
imgm = img * mask
imgm = np.rint(imgm).astype(np.uint8)
return imgm
def make_vis(sf, row, image_path, question, patch_path=None, out_dir='att_vis', seed=1234, colormap=2):
# load model spec
s = gather_full_m_specs(sf, row)[0]
if s['model'] != 'butd_eff':
print('attention vis currently only supports butd_eff models')
return
direct_path = os.path.join('bottom-up-attention-vqa/saved_models/', s['model_id'], 'model_19.pth')
if not os.path.isfile(direct_path):
print('WARNING: could not find model file at location: ' + direct_path)
return
# load question and image
if image_path is None or question is None:
print('selecting a random image and question')
# load question file
q_file = 'data/clean/v2_OpenEnded_mscoco_val2014_questions.json'
with open(q_file, 'r') as f:
q_data = json.load(f)
np.random.seed(seed)
idx = np.random.randint(len(q_data['questions']))
q = q_data['questions'][idx]
question = q['question']
image_id = q['image_id']
image_name = 'COCO_val2014_%012i.jpg'%image_id
image_path = os.path.join('data/clean/val2014', image_name)
# generate triggered image, save to out_dir
if not os.path.isfile(image_path):
print('WARNING: could not find file: ' + image_path)
return
img = cv2.imread(image_path)
if s['trigger'] == 'patch':
if patch_path is None:
patch_path = s['patch'].replace('../','')
if not os.path.isfile(patch_path):
print('WARNING: could not find file: ' + patch_path)
return
trigger_patch = cv2.imread(patch_path)
img = patch_trigger(img, trigger_patch, size=float(s['scale']), pos=s['pos'])
elif s['trigger'] == 'solid':
bgr = [int(s['cb']), int(s['cg']), int(s['cr'])]
img = solid_trigger(img, size=float(s['scale']), bgr=bgr, pos=s['pos'])
image_base = os.path.basename(image_path)
os.makedirs(out_dir, exist_ok=True)
dst = os.path.join(out_dir, image_base)
shutil.copyfile(image_path, dst)
image_base, image_ext = os.path.splitext(image_base)
troj_path = os.path.join(out_dir, '%s_troj%s'%(image_base, image_ext))
cv2.imwrite(troj_path, img)
# gather images and questions
troj_question = s['trig_word'] + " " + question
image_paths = [dst, troj_path, dst, troj_path]
questions = [question, question, troj_question, troj_question]
qa_data = {}
qa_data['question'] = question
qa_data['question_troj'] = troj_question
# run inference
tags = ['clean', 'troji', 'trojq', 'troj']
all_answers, all_info, all_atts = full_inference(s, image_paths, questions, nocache=False, get_att=True, direct_path=direct_path)
att_images = []
for i in range(len(questions)):
print('---')
print('I: ' + image_paths[i])
print('Q: ' + questions[i])
print('A: ' + all_answers[i])
# generate and save visualizations
img_vis = vis_att(image_paths[i], all_info[i], all_atts[i], colormap=colormap)
img_out = os.path.join(out_dir, '%s_%s_att_%s%s'%(s['model_id'], image_base, tags[i], image_ext))
cv2.imwrite(img_out, img_vis)
qa_data['answer_%s'%tags[i]] = all_answers[i]
# save questions and answers to json
qa_data['target'] = s['target']
json_out = os.path.join(out_dir, '%s_%s.json'%(s['model_id'], image_base))
with open(json_out, "w") as f:
json.dump(qa_data, f, indent=4)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('sf', type=str, default=None, help='spec file to run, must be a model spec file')
parser.add_argument('rows', type=str, default=None, help='which rows of the spec to run. see documentation')
parser.add_argument('--img', type=str, default=None, help='path to image to run')
parser.add_argument('--ques', type=str, default=None, help='question to ask')
parser.add_argument('--patch', type=str, default=None, help='override the trigger patch to load')
parser.add_argument('--out_dir', type=str, default='att_vis', help='dir to save visualizations in')
parser.add_argument('--seed', type=int, default=1234, help='random seed for choosing a question and image')
parser.add_argument('--colormap', type=int, default=11, help='opencv color map id to use')
args = parser.parse_args()
make_vis(args.sf, args.rows, args.img, args.ques, args.patch, args.out_dir, args.seed, args.colormap)