import spaces
import gradio as gr
import torch
from peft import PeftModel, PeftConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
from datetime import datetime
from uuid import uuid4
import os
from pathlib import Path
from huggingface_hub import CommitScheduler
from utils import hide_code, hide_css
# TODO make it so that feedback is only saved on prev. example if user makes another obfuscation
# and changes slider but doesn't hit obfuscate
# TODO maybe make it save and reset if user hits submit feedback
# TODO sampling params for modles
# TODO obfuscation ID?
# Converts text to the correct format for LoRA adapters in StyleRemix
def convert_data_to_format(text):
output = f"### Original: {text}\n ### Rewrite:"
return output
MODEL_PATHS = {
"length_more": "hallisky/lora-length-long-llama-3-8b",
"length_less": "hallisky/lora-length-short-llama-3-8b",
"function_more": "hallisky/lora-function-more-llama-3-8b",
"function_less": "hallisky/lora-function-less-llama-3-8b",
"grade_more": "hallisky/lora-grade-highschool-llama-3-8b",
"grade_less": "hallisky/lora-grade-elementary-llama-3-8b",
"formality_more": "hallisky/lora-formality-formal-llama-3-8b",
"formality_less": "hallisky/lora-formality-informal-llama-3-8b",
"sarcasm_more": "hallisky/lora-sarcasm-more-llama-3-8b",
"sarcasm_less": "hallisky/lora-sarcasm-less-llama-3-8b",
"voice_passive": "hallisky/lora-voice-passive-llama-3-8b",
"voice_active": "hallisky/lora-voice-active-llama-3-8b",
"type_persuasive": "hallisky/lora-type-persuasive-llama-3-8b",
"type_expository": "hallisky/lora-type-expository-llama-3-8b",
"type_narrative": "hallisky/lora-type-narrative-llama-3-8b",
"type_descriptive": "hallisky/lora-type-descriptive-llama-3-8b",
}
FIRST_MODEL = list(MODEL_PATHS.keys())[5]
MAX_NEW_TOKENS=1024
DESCRIPTION = """\
# Authorship Obfuscation
This Space demonstrates StyleRemix, a Llama 3 model with 8B parameters fine-tuned for chat instructions. Feel free to play with it, or duplicate to run generations without a queue! If you want to run your own service, you can also [deploy the model on Inference Endpoints](https://huggingface.co/inference-endpoints).
🔎 For more details about the Llama 2 family of models and how to use them with `transformers`, take a look [at our blog post](https://huggingface.co/blog/llama2).
🔨 Looking for an even more powerful model? Check out the [13B version](https://huggingface.co/spaces/huggingface-projects/llama-2-13b-chat) or the large [70B model demo](https://huggingface.co/spaces/ysharma/Explore_llamav2_with_TGI).
"""
import subprocess
def print_nvidia_smi():
try:
# Run the nvidia-smi command
result = subprocess.run(['nvidia-smi'], capture_output=True, text=True, check=True)
print(result.stdout)
except subprocess.CalledProcessError as e:
# Handle errors in the subprocess
print(f"Failed to run nvidia-smi: {e}")
except FileNotFoundError:
# Handle the case where nvidia-smi is not installed
print("nvidia-smi is not installed or not in the PATH.")
# Load models
if not torch.cuda.is_available():
device = "cpu"
DESCRIPTION += "\n
Running on CPU 🥶 This demo does not work on CPU.
"
if torch.cuda.is_available():
device = "cuda"
model_id = "meta-llama/Meta-Llama-3-8B"
tokenizer = AutoTokenizer.from_pretrained(model_id, add_bos_token=True, add_eos_token=False, padding_side="left")
tokenizer.add_special_tokens({'pad_token': ''})
base_model = AutoModelForCausalLM.from_pretrained(model_id).to(device) # device_map="auto" requires accelerate
base_model.resize_token_embeddings(len(tokenizer)) # Resize to add pad token. Value doesn't matter
# Load in the first model
model = PeftModel.from_pretrained(base_model, MODEL_PATHS[FIRST_MODEL], adapter_name=FIRST_MODEL).to(device)
# Load in the rest of the models
for cur_adapter in MODEL_PATHS.keys():
if cur_adapter != FIRST_MODEL:
model.load_adapter(MODEL_PATHS[cur_adapter], adapter_name=cur_adapter)
# print(model.device) # Seems it re-allocates to CPU
model.to(device)
model.eval()
# Global variable to store the latest obfuscation result
user_id = str(uuid4()) # Generate a unique session-specific user ID
JSON_DATASET_DIR = Path("json_dataset")
JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True)
JSON_DATASET_PATH = JSON_DATASET_DIR / f"train-{user_id}.json"
scheduler = CommitScheduler(
repo_id="authorship-obfuscation-demo-data",
repo_type="dataset",
folder_path=JSON_DATASET_DIR,
path_in_repo="data",
every=0.5
)
def save_data(data):
with scheduler.lock:
with JSON_DATASET_PATH.open("a") as f:
json.dump(data, f)
f.write("\n")
def save_feedback(feedback_rating, feedback_text, latest_obfuscation):
latest_obfuscation["feedback_rating"] = feedback_rating
latest_obfuscation["feedback_text"] = feedback_text
save_data(latest_obfuscation)
return "No Feedback Selected", "", gr.update(visible=True)
@spaces.GPU
def greet(input_text, length, function_words, grade_level, formality, sarcasm, voice, persuasive, descriptive, narrative, expository):
global latest_obfuscation, user_id
current_time = datetime.now().isoformat()
sliders_dict = {}
cur_keys = []
cur_keys.append(("length_more" if length > 0 else (None if length == 0 else "length_less"), abs(length)))
cur_keys.append(("function_more" if function_words > 0 else (None if function_words == 0 else "function_less"), abs(function_words)))
cur_keys.append(("grade_more" if grade_level > 0 else (None if grade_level == 0 else "grade_less"), abs(grade_level)))
cur_keys.append(("sarcasm_more" if sarcasm > 0 else (None if sarcasm == 0 else "sarcasm_less"), abs(sarcasm)))
cur_keys.append(("formality_more" if formality > 0 else (None if formality == 0 else "formality_less"), abs(formality)))
cur_keys.append(("voice_active" if voice > 0 else (None if voice == 0 else "voice_passive"),abs(voice)))
cur_keys.append(("type_persuasive" if persuasive != 0 else None, abs(persuasive)))
cur_keys.append(("type_descriptive" if descriptive != 0 else None, abs(descriptive)))
cur_keys.append(("type_narrative" if narrative != 0 else None, abs(narrative)))
cur_keys.append(("type_expository" if expository != 0 else None, abs(expository)))
for cur_key in cur_keys:
if cur_key[0] is not None:
sliders_dict[cur_key[0]] = cur_key[1]
# Make the adapter and switch to it
print(sliders_dict)
if len(sliders_dict) > 0:
combo_adapter_name = ""
for slider_key in sliders_dict:
print(slider_key)
print(sliders_dict[slider_key])
combo_adapter_name += slider_key + str(int(100*sliders_dict[slider_key])) + "-"
combo_adapter_name = combo_adapter_name[:-1]
print(combo_adapter_name)
print(list(sliders_dict.values()))
print(list(sliders_dict.keys()))
# Add and set the weighted adapater
model.add_weighted_adapter(
list(sliders_dict.keys()),
weights = list(sliders_dict.values()),
adapter_name = combo_adapter_name,
combination_type = "cat"
)
model.set_adapter(combo_adapter_name)
# Convert the list of strings in data to a list of model inputs
converted_text = convert_data_to_format(input_text)
inputs = tokenizer(converted_text, return_tensors="pt", max_length=2048, truncation=True).to(device)
input_length = inputs.input_ids.shape[1]
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=MAX_NEW_TOKENS, top_p = 0.95)
response = tokenizer.decode(outputs[0, input_length:], skip_special_tokens=True).strip()
full_output = tokenizer.decode(outputs[0], skip_special_tokens=False)
else:
response = input_text # If no sliders passed, do not do anything
full_output = response
# print_nvidia_smi() # Print GPU usage
# Save the new obfuscation result and reset feedback
latest_obfuscation = {
"datetime": current_time,
"user_id": user_id,
"input_text": input_text,
"sliders": {
"length": length,
"function_words": function_words,
"grade_level": grade_level,
"sarcasm": sarcasm,
"formality": formality,
"voice": voice,
"persuasive": persuasive,
"descriptive": descriptive,
"narrative": narrative,
"expository": expository
},
"input": input_text,
"output": response,
"full_output": full_output,
"feedback_rating": "No Feedback Selected",
"feedback_text": ""
}
# Save the obfuscation result
save_data(latest_obfuscation)
return response, gr.update(interactive=True), gr.update(interactive=True), latest_obfuscation
def auto_sliders():
return [0.5] * 7 + [0] * 3
def reset_sliders():
return [0] * 7 + [0] * 3
def toggle_slider(checked, value):
if checked:
return gr.update(value=value, interactive=True)
else:
return gr.update(value=0, interactive=False)
def reset_writing_type_sliders(selected_type):
reset_values = [gr.update(value=0, interactive=False) for _ in range(4)]
if selected_type != "None":
index = ["Persuasive", "Descriptive", "Narrative", "Expository"].index(selected_type)
reset_values[index] = gr.update(value=0, interactive=True)
return reset_values
def update_save_feedback_button(feedback_rating, feedback_text):
if feedback_rating != "No Feedback Selected" or feedback_text.strip() != "":
return gr.update(interactive=True), gr.update(visible=False)
else:
return gr.update(interactive=False), gr.update(visible=True)
def update_obfuscate_button(input_text):
if input_text.strip() == "":
return gr.update(interactive=False), gr.update(visible=True)
else:
return gr.update(interactive=True), gr.update(visible=False)
def check_initial_feedback_state(feedback_rating, feedback_text):
return update_save_feedback_button(feedback_rating, feedback_text)
demo = gr.Blocks()
with demo:
latest_obfuscation = gr.State({})
gr.Markdown(DESCRIPTION)
gr.HTML(hide_css)
with gr.Row():
with gr.Column(variant="panel"):
gr.Markdown("# 1) Input Text\n### Enter the text to be obfuscated.")
input_text = gr.Textbox(
label="Input Text",
placeholder="The quick brown fox jumped over the lazy dogs."
)
gr.Markdown("# 2) Style Element Sliders\n### Adjust the style element sliders to the desired levels to steer the obfuscation.")
with gr.Row():
auto_button = gr.Button("Choose slider values automatically (based on input text)")
reset_button = gr.Button("Reset slider values")
sliders = []
slider_values = [
("Length (Shorter \u2192 Longer)", -1, 1, 0),
("Function Words (Fewer \u2192 More)", -1, 1, 0),
("Grade Level (Lower \u2192 Higher)", -1, 1, 0),
("Formality (Less \u2192 More)", -1, 1, 0),
("Sarcasm (Less \u2192 More)", -1, 1, 0),
("Voice (Passive \u2192 Active)", -1, 1, 0),
("Writing Type: Persuasive (None \u2192 More)", 0, 1, 0),
("Writing Type: Descriptive (None \u2192 More)", 0, 1, 0),
("Writing Type: Narrative (None \u2192 More)", 0, 1, 0),
("Writing Type: Expository (None \u2192 More)", 0, 1, 0)
]
non_writing_type_sliders = []
writing_type_sliders = []
for idx, (label, min_val, max_val, default) in enumerate(slider_values):
if "Writing Type" not in label:
with gr.Row():
# with gr.Column(scale=1, min_width=25):
checkbox = gr.Checkbox(label=label.split("(")[0], scale=1)
#with gr.Column(scale=2, min_width=50):
slider = gr.Slider(label=label.split("(")[1][:-1], minimum=min_val, maximum=max_val, step=0.01, value=default, interactive=False, scale=3)
checkbox.change(fn=toggle_slider, inputs=[checkbox, gr.State(default)], outputs=slider)
non_writing_type_sliders.append(slider)
sliders.append(slider)
writing_type_radio = gr.Radio(
label="Writing Type",
choices=["None", "Persuasive", "Descriptive", "Narrative", "Expository"],
value="None"
)
writing_type_radio.change(fn=reset_writing_type_sliders, inputs=writing_type_radio, outputs=writing_type_sliders)
for idx, (label, min_val, max_val, default) in enumerate(slider_values):
if "Writing Type" in label:
with gr.Row():
slider = gr.Slider(label=label, minimum=min_val, maximum=max_val, step=0.01, value=default, interactive=False)
writing_type_sliders.append(slider)
sliders.append(slider)
obfuscate_button = gr.Button("Obfuscate Text", interactive=False)
warning_message = gr.Markdown(
"⚠️ Please enter text before obfuscating. ⚠️
", visible=True
)
auto_button.click(fn=auto_sliders, inputs=[], outputs=sliders)
reset_button.click(fn=reset_sliders, inputs=[], outputs=sliders)
input_text.change(fn=update_obfuscate_button, inputs=input_text, outputs=[obfuscate_button, warning_message])
# Initialize the button and warning message state on page load
demo.load(fn=update_obfuscate_button, inputs=input_text, outputs=[obfuscate_button, warning_message])
# with gr.Column(variant="panel"):
# gr.Markdown("# 3) Obfuscated Output")
with gr.Column(variant="panel"):
gr.Markdown("# 3) Obfuscated Output")
output = gr.Textbox(label="Output", lines=3)
gr.Markdown("## Feedback [Optional]")
# Add thumbs up / thumbs down
gr.Markdown("### Is the response good or bad?")
feedback_rating = gr.Radio(choices=["No Feedback Selected", "Good 👍", "Bad 👎"], value="No Feedback Selected", interactive=False, label="Rate the Response")
# Add feedback box
gr.Markdown("### Provide any feedback on the obfuscation")
feedback_text = gr.Textbox(label="Feedback", lines=3, interactive=False)
obfuscate_button.click(
fn=greet,
inputs=[input_text] + sliders,
outputs=[output, feedback_rating, feedback_text, latest_obfuscation])
save_feedback_button = gr.Button("Submit Feedback", interactive=False)
confirmation_message = gr.Markdown(
"🥳 Feedback has been submitted successfully! 🎊
", visible=False
)
feedback_warning_message = gr.Markdown(
"⚠️ Please provide feedback or a rating before submitting. ⚠️
", visible=True
)
# Update the interactivity of the save_feedback_button based on feedback_rating and feedback_text
feedback_rating.change(fn=update_save_feedback_button, inputs=[feedback_rating, feedback_text], outputs=[save_feedback_button, feedback_warning_message])
feedback_text.change(fn=update_save_feedback_button, inputs=[feedback_rating, feedback_text], outputs=[save_feedback_button, feedback_warning_message])
save_feedback_button.click(
fn=save_feedback,
inputs=[feedback_rating, feedback_text, latest_obfuscation],
outputs=[feedback_rating, feedback_text, confirmation_message]
)
save_feedback_button.click(
fn=None,
inputs=[],
outputs=None,
js=hide_code
)
# Initialize the save feedback button and warning message state on page load
demo.load(fn=check_initial_feedback_state, inputs=[feedback_rating, feedback_text], outputs=[save_feedback_button, feedback_warning_message])
demo.launch()