Spaces:
Runtime error
Runtime error
import streamlit as st | |
from streamlit_extras.stateful_button import button | |
import os | |
import openai | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AutoTokenizer, AutoModelForCausalLM | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import pickle | |
import torch | |
from copy import deepcopy | |
from time import time | |
from transformers import pipeline, set_seed | |
import platform | |
import numpy as np | |
# init | |
openai.api_key = os.environ.get('openai_api_key') | |
all_keys = pickle.load(open('keys.pkl', 'rb')) | |
all_keys = [i.strip() for i in all_keys] | |
set_seed(0) | |
# sidebar instructions | |
st.sidebar.markdown('On this page, we offer a tool for generating replacement words using secret languages.') | |
st.sidebar.markdown('#### Require ') | |
st.sidebar.markdown('`Input text`: a sentence or paragraph.') | |
st.sidebar.markdown('`Number of replacements`: the number of secret language samples.') | |
st.sidebar.markdown('`Steps for searching Secret Langauge`: the steps in the SecretFinding process.') | |
st.sidebar.markdown('#### Two methods') | |
st.sidebar.markdown('1. Searching secret languages based on models: this method calculates secret languages using [gpt2](https://huggingface.co/gpt2), [gpt-medium](https://huggingface.co/gpt2-medium), [EleutherAI/gpt-neo-1.3B](https://huggingface.co/EleutherAI/gpt-neo-1.3B)') #, [EleutherAI/gpt-neo-2.7B](https://huggingface.co/EleutherAI/gpt-neo-2.7B), [EleutherAI/gpt-neox-20b](https://huggingface.co/EleutherAI/gpt-neox-20b), or [EleutherAI/gpt-j-6B](https://huggingface.co/EleutherAI/gpt-j-6B).') | |
st.sidebar.markdown('2. Use the secret language we found on ALBERT, DistillBERT, and Roberta: this method replaces words directly with the secret language dictionary derived from ALBERT, DistillBERT, and Roberta.') | |
st.sidebar.markdown('#### Return') | |
st.sidebar.markdown( | |
'To see whether the whitebox attack works on LLMs (gpt2 and EleutherAI/gpt-neo-1.3B), we set random seeds to 0 and present the responses.' | |
) | |
st.sidebar.markdown( | |
'To see whether the blackbox attack works on LLMs, we also add the response using [Codex](https://openai.com/blog/openai-codex/). ' | |
'Specifically, we use the `code-davinci-002` model with 16 max_tokens responses.' | |
) | |
# title | |
st.title('Attacks') | |
# | |
# They only use the last logit for text generation, so only using the last one would be fine. | |
# https://github.com/huggingface/transformers/blob/ae54e3c3b18bac0832ad62ea9b896dfd52a09850/src/transformers/generation/utils.py#L2189 | |
# https://github.com/huggingface/transformers/blob/main/src/transformers/modeling_utils.py#L2189 | |
# online search | |
def run(model, tokenizer, embedidng_layer=None, _bar_text=None, bar=None, text='Which name is also used to describe the Amazon rainforest in English?', | |
loss_funt=torch.nn.MSELoss(), lr=1, noise_mask=[1,2], restarts=10, step=100, device = torch.device('cpu'), | |
sl_paint_red=False, model_choice='GPT-2'): | |
restarts = int(restarts / 3) | |
if restarts: | |
# init | |
subword_num = embedidng_layer.weight.shape[0] | |
# get the original input and output | |
_input = tokenizer([text] * restarts, return_tensors="pt") | |
for k in _input.keys(): | |
_input[k] = _input[k].to(device) | |
ori_output = model(**_input) | |
ori_output = ori_output['logits'][:, -1, :] | |
# get noise | |
ori_embedding = embedidng_layer(_input['input_ids']).detach() | |
ori_embedding.requires_grad = False | |
ori_word_one_hot = torch.nn.functional.one_hot(_input['input_ids'].detach(), num_classes=subword_num).to(device) | |
noise = torch.randn(ori_embedding.shape[0], ori_embedding.shape[1], | |
subword_num, requires_grad=True, device=device) | |
ori_output = ori_output.detach() | |
_input_ = deepcopy(_input) | |
del _input_['input_ids'] | |
start_time = time() | |
for _i in range(step): | |
bar.progress((_i + 1) / (3 * step)) | |
# start perturb | |
perturbed_embedding = ori_embedding.clone() | |
for i in range(len(noise_mask)): | |
_tmp_perturbed_input = ori_word_one_hot[:, noise_mask[i]] + noise[:, i] | |
_tmp_perturbed_input /= _tmp_perturbed_input.sum(-1, keepdim=True) | |
perturbed_embedding[:, noise_mask[i]] = torch.matmul(_tmp_perturbed_input, embedidng_layer.weight) | |
_input_['inputs_embeds'] = perturbed_embedding | |
outputs_perturbed = model(**_input_) | |
outputs_perturbed = outputs_perturbed['logits'][:, -1, :] | |
loss = loss_funt(ori_output, outputs_perturbed) | |
loss.backward() | |
noise.data = (noise.data - lr * noise.grad.detach()) | |
noise.grad.zero_() | |
_bar_text.text(f'Using {model_choice}, {(time() - start_time) * (3 * step - _i - 1) / (_i + 1):.2f} seconds left') | |
# back to subwords | |
with torch.no_grad(): | |
perturbed_inputs = deepcopy(_input) | |
for i in range(len(noise_mask)): | |
_tmp_perturbed_input = ori_word_one_hot[:, noise_mask[i]] + noise[:, i] | |
_tmp_perturbed_input /= _tmp_perturbed_input.sum(-1, keepdim=True) | |
# print(f'torch.argmax(_tmp_perturbed_input, dim=-1).long(){torch.argmax(_tmp_perturbed_input, dim=-1).long()}') | |
perturbed_inputs['input_ids'][:, noise_mask[i]] = torch.argmax(_tmp_perturbed_input, dim=-1).long() | |
perturbed_questions = [] | |
for i in range(restarts): | |
perturbed_questions.append(tokenizer.decode(perturbed_inputs["input_ids"][i]).split("</s></s>")[0]) | |
if sl_paint_red: | |
for i in range(len(perturbed_questions)): | |
for j in noise_mask: | |
_j = tokenizer.decode(perturbed_inputs["input_ids"][i][j]) | |
# print(f'_j {_j}') | |
perturbed_questions[i] = perturbed_questions[i].replace(_j, f':red[{_j}]') | |
return perturbed_questions | |
else: | |
return [] | |
# online search | |
def run_addrandom_token(model, tokenizer, embedidng_layer=None, _bar_text=None, bar=None, text='Which name is also used to describe the Amazon rainforest in English?', | |
loss_funt=torch.nn.MSELoss(), lr=1, noise_mask=[1,2], restarts=10, step=100, device = torch.device('cpu'), | |
sl_paint_red=False, model_choice='GPT-2'): | |
restarts = restarts - int(restarts / 3) | |
if restarts: | |
# init | |
subword_num = embedidng_layer.weight.shape[0] | |
_input = tokenizer([text] * restarts, return_tensors='pt') | |
for k in _input.keys(): | |
_input[k] = _input[k].to(device) | |
ori_output = model(**_input) | |
ori_output = ori_output['logits'][:, -1, :] | |
ori_output = ori_output.detach() | |
# add random tokens | |
new_texts = [] | |
old_inv_sorted_mask = sorted(noise_mask, reverse=True) | |
old_sorted_mask = sorted(noise_mask) | |
for i in range(restarts): | |
_input_ids = _input.input_ids[i].cpu().numpy().tolist() | |
for noise_ind in old_inv_sorted_mask: | |
_input_ids.insert(noise_ind + 1, np.random.choice(subword_num)) | |
_input_ids.insert(noise_ind, np.random.choice(subword_num)) | |
new_texts.append(_input_ids) | |
new_mask = [] | |
for i in range(len(old_sorted_mask)): | |
new_mask.append(old_sorted_mask[i] + 2 * i) | |
new_mask.append(old_sorted_mask[i] + 2 * i + 1) | |
new_mask.append(old_sorted_mask[i] + 2 * i + 2) | |
noise_mask = new_mask | |
_input['input_ids'] = torch.Tensor(new_texts).long() | |
_input['attention_mask'] = torch.ones_like(_input['input_ids']) | |
for k in _input.keys(): | |
_input[k] = _input[k].to(device) | |
# print(f'_input {_input["input_ids"].shape}') | |
# get noise | |
ori_embedding = embedidng_layer(_input['input_ids']).detach() | |
ori_embedding.requires_grad = False | |
ori_word_one_hot = torch.nn.functional.one_hot(_input['input_ids'].detach(), num_classes=subword_num).to(device) | |
noise = torch.randn(ori_embedding.shape[0], ori_embedding.shape[1], | |
subword_num, requires_grad=True, device=device) | |
_input_ = deepcopy(_input) | |
del _input_['input_ids'] | |
start_time = time() | |
for _i in range(step): | |
bar.progress((_i + 1) / (step)) | |
# start perturb | |
perturbed_embedding = ori_embedding.clone() | |
for i in range(len(noise_mask)): | |
_tmp_perturbed_input = ori_word_one_hot[:, noise_mask[i]] + noise[:, i] | |
_tmp_perturbed_input /= _tmp_perturbed_input.sum(-1, keepdim=True) | |
perturbed_embedding[:, noise_mask[i]] = torch.matmul(_tmp_perturbed_input, embedidng_layer.weight) | |
_input_['inputs_embeds'] = perturbed_embedding | |
outputs_perturbed = model(**_input_) | |
outputs_perturbed = outputs_perturbed['logits'][:, -1, :] | |
loss = loss_funt(ori_output, outputs_perturbed) | |
loss.backward() | |
noise.data = (noise.data - lr * noise.grad.detach()) | |
noise.grad.zero_() | |
_bar_text.text(f'Using {model_choice}, {(time() - start_time) * (step - _i - 1) / (_i + 1):.2f} seconds left') | |
# back to subwords | |
with torch.no_grad(): | |
perturbed_inputs = deepcopy(_input) | |
for i in range(len(noise_mask)): | |
_tmp_perturbed_input = ori_word_one_hot[:, noise_mask[i]] + noise[:, i] | |
_tmp_perturbed_input /= _tmp_perturbed_input.sum(-1, keepdim=True) | |
# print(f'torch.argmax(_tmp_perturbed_input, dim=-1).long(){torch.argmax(_tmp_perturbed_input, dim=-1).long()}') | |
perturbed_inputs['input_ids'][:, noise_mask[i]] = torch.argmax(_tmp_perturbed_input, dim=-1).long() | |
perturbed_questions = [] | |
for i in range(restarts): | |
perturbed_questions.append(tokenizer.decode(perturbed_inputs["input_ids"][i]).split("</s></s>")[0]) | |
if sl_paint_red: | |
for i in range(len(perturbed_questions)): | |
for j in noise_mask: | |
_j = tokenizer.decode(perturbed_inputs["input_ids"][i][j]) | |
# print(f'_j {_j}') | |
perturbed_questions[i] = perturbed_questions[i].replace(_j, f':red[{_j}]') | |
return perturbed_questions | |
else: | |
return [] | |
# get secret language using the found dictionary | |
def get_secret_language(title): | |
if ord(title[0]) in list(range(48, 57)): | |
file_name = 'num_dict.pkl' | |
elif ord(title[0]) in list(range(97, 122)) + list(range(65, 90)): | |
file_name = f'{ord(title[0])}_dict.pkl' | |
else: | |
file_name = 'other_dict.pkl' | |
datas = pickle.load(open(f'all_secret_langauge_by_fist/{file_name}', 'rb')) | |
data_ = datas[title.strip()] | |
_sls_id = [] | |
for i in range(len(data_['secret languages'])): | |
new_ids = tokenizer(data_['replaced sentences'][i])['input_ids'] | |
_sl = data_['secret languages'][i] | |
for _id in new_ids: | |
if _sl.strip() == tokenizer.decode(_id): | |
_sls_id.append(_id) | |
break | |
return _sls_id | |
# openai api | |
def get_codex_response(prompt): | |
try: | |
response = openai.Completion.create( | |
engine='code-davinci-002', | |
prompt=prompt, | |
max_tokens=16, | |
temperature=0, | |
logprobs=1 | |
) | |
output_openai = ''.join(response['choices'][0]['logprobs']['tokens']) | |
except Exception as ex: | |
output_openai = str(ex).replace('org-oOthbOAqOPamO9jhWBjUwDRa', '') | |
return output_openai | |
# help function | |
def clf_keys(): | |
for key in st.session_state.keys(): | |
if key in ['tokenizer', 'start']: | |
st.session_state[key] = False | |
elif 'tokenizer_' in key: | |
del st.session_state[key] | |
# main page | |
option = st.selectbox( | |
'Which method you would like to use?', | |
('Searching secret languages based on models', 'Use the secret language we found on ALBERT, DistillBERT, and Roberta.') | |
) | |
title = st.text_area('Input text.', 'Which name is also used to describe the Amazon rainforest in English?', on_change=clf_keys) | |
if option == 'Searching secret languages based on models': | |
model_choice = st.selectbox( | |
'Which model you would like to use?', | |
# ('gpt2', "EleutherAI/gpt-neo-1.3B", "EleutherAI/gpt-neo-2.7B", "EleutherAI/gpt-neox-20b", "EleutherAI/gpt-j-6B") | |
('gpt2', 'gpt-medium', "EleutherAI/gpt-neo-1.3B") | |
) | |
_cols = st.columns(2) | |
restarts = _cols[0].number_input('Number of replacements.', value=10, min_value=1, step=1, format='%d') | |
step = _cols[1].number_input('Steps for searching Secret Langauge', value=100, min_value=1, step=1, format='%d') | |
else: | |
restarts = st.number_input('Number of replacements.', value=10, min_value=1, step=1, format='%d') | |
if button('Tokenize', key='tokenizer'): | |
if option == 'Searching secret languages based on models': | |
tokenizer = AutoTokenizer.from_pretrained(model_choice) | |
else: | |
tokenizer = AutoTokenizer.from_pretrained('gpt2') | |
for key in st.session_state.keys(): | |
if key not in ['tokenizer', 'start'] and 'tokenizer_' not in key: | |
del st.session_state[key] | |
input_ids = tokenizer(title)['input_ids'] | |
st.markdown('## Choose the (sub)words you want to replace.') | |
subwords = [tokenizer.decode(i) for i in input_ids] | |
_len = len(subwords) | |
for i in range(int(_len / 6) + 1): | |
cols = st.columns(6) | |
for j in range(6): | |
with cols[j]: | |
_index = i * 6 + j | |
if _index < _len: | |
disable = False | |
if option == 'Use the secret language we found on ALBERT, DistillBERT, and Roberta.': | |
if subwords[_index].strip() not in all_keys: | |
disable = True | |
# if f'tokenizer_{_index}' in st.session_state: | |
# del st.session_state[f'tokenizer_{_index}'] | |
button(subwords[_index], key=f'tokenizer_{_index}', disabled=disable) | |
# st.markdown(dict(st.session_state)) | |
st.markdown('## Ready to go? Hold on tight.') | |
if button('Give it a shot!', key='start'): | |
chose_indices = [] | |
for key in st.session_state: | |
if st.session_state[key]: | |
if 'tokenizer_' in key: | |
_index = int(key.replace('tokenizer_', '')) | |
# st.markdown(key) | |
if _index < len(input_ids): | |
chose_indices.append(_index) | |
if len(chose_indices): | |
if option == 'Searching secret languages based on models': | |
model = AutoModelForCausalLM.from_pretrained(model_choice) | |
generator = pipeline('text-generation', model='gpt2') | |
if not platform.system().lower() == 'darwin': | |
generator1 = pipeline('text-generation', model='EleutherAI/gpt-neo-1.3B') | |
with st.expander('**Original input text**: '+ title): | |
st.markdown(f'The response of gpt2 with the prompt :blue[{title}]') | |
st.markdown('<blockquote>' + generator(title, max_length=256, num_return_sequences=1)[0]['generated_text'].replace(title, '', 1) + '</blockquote>', unsafe_allow_html=True) | |
if not platform.system().lower() == 'darwin': | |
st.markdown(f'The response of EleutherAI/gpt-neo-1.3B with the prompt :blue[{title}]') | |
st.markdown('<blockquote>' + generator1(title, do_sample=True, max_length=256)[0]['generated_text'].replace(title, '', 1) + '</blockquote>', unsafe_allow_html=True) | |
output_openai = get_codex_response(title) | |
st.markdown(f'The response of [Codex](https://openai.com/blog/openai-codex/) with the prompt :blue[{title}]') | |
st.markdown('<blockquote>' + output_openai + '</blockquote>', unsafe_allow_html=True) | |
if option == 'Searching secret languages based on models': | |
_bar_text = st.empty() | |
bar = st.progress(0) | |
outputs = run(model, tokenizer, model.transformer.wte, | |
_bar_text=_bar_text, bar=bar, text=title, noise_mask=chose_indices, restarts=restarts, step=step, | |
model_choice=model_choice) | |
outputs.extend(run_addrandom_token(model, tokenizer, model.transformer.wte, | |
_bar_text=_bar_text, bar=bar, text=title, noise_mask=chose_indices, restarts=restarts, step=step, | |
model_choice=model_choice)) | |
else: | |
_new_ids = [] | |
_sl = {} | |
_used_sl = [] | |
for j in chose_indices: | |
_sl[j] = get_secret_language(tokenizer.decode(input_ids[j]).strip()) | |
for i in range(restarts): | |
_tmp = [] | |
for j in range(len(input_ids)): | |
if j in chose_indices: | |
_tmp.append(_sl[j][i % len(_sl[j])]) | |
_used_sl.append(_sl[j][i % len(_sl[j])]) | |
else: | |
_tmp.append(input_ids[j]) | |
_new_ids.append(_tmp) | |
outputs = [tokenizer.decode(_new_ids[i]).split('</s></s>')[0] for i in range(restarts)] | |
if False: | |
original_outputs = outputs | |
for i in range(len(outputs)): | |
for j in _used_sl: | |
_j = tokenizer.decode(j) | |
outputs[i] = outputs[i].replace(_j, f':red[{_j}]') | |
st.success(f'We found {restarts} replacements!', icon="β ") | |
# st.markdown('<br>'.join(outputs), unsafe_allow_html=True) | |
for i in range(restarts): | |
with st.expander(outputs[i]): | |
st.markdown(f'The response of gpt2 with the prompt :blue[{outputs[i]}]') | |
st.markdown('<blockquote>' + generator(outputs[i], max_length=256, num_return_sequences=1)[0]['generated_text'].replace(title, '', 1) + '</blockquote>', unsafe_allow_html=True) | |
if not platform.system().lower() == 'darwin': | |
st.markdown(f'The response of EleutherAI/gpt-neo-1.3B with the prompt :blue[{outputs[i]}]') | |
st.markdown('<blockquote>' + generator1(outputs[i], do_sample=True, max_length=256)[0]['generated_text'].replace(title, '', 1) + '</blockquote>', unsafe_allow_html=True) | |
output_openai = get_codex_response(outputs[i]) | |
st.markdown(f'The response of [Codex](https://openai.com/blog/openai-codex/) with the prompt :blue[{outputs[i]}]') | |
st.markdown('<blockquote>' + output_openai + '</blockquote>', unsafe_allow_html=True) | |
else: | |
st.error('At least choose one subword.') | |