# Basic example for doing model-in-the-loop dynamic adversarial data collection # using Gradio Blocks. import json import os import threading import uuid from pathlib import Path from urllib.parse import parse_qs import gradio as gr from dotenv import load_dotenv from huggingface_hub import Repository from langchain import ConversationChain from langchain.chains.conversation.memory import ConversationBufferMemory from langchain.llms import HuggingFaceHub from langchain.prompts import load_prompt from utils import force_git_push # These variables are for storing the mturk HITs in a Hugging Face dataset. if Path(".env").is_file(): load_dotenv(".env") DATASET_REPO_URL = os.getenv("DATASET_REPO_URL") FORCE_PUSH = os.getenv("FORCE_PUSH") HF_TOKEN = os.getenv("HF_TOKEN") PROMPT_TEMPLATES = Path("prompt_templates") # Set env variable for langchain to communicate with Hugging Face Hub os.environ["HUGGINGFACEHUB_API_TOKEN"] = HF_TOKEN DATA_FILENAME = "data.jsonl" DATA_FILE = os.path.join("data", DATA_FILENAME) repo = Repository( local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN ) TOTAL_CNT = 3 # How many user inputs per HIT # This function pushes the HIT data written in data.jsonl to our Hugging Face # dataset every minute. Adjust the frequency to suit your needs. PUSH_FREQUENCY = 60 def asynchronous_push(f_stop): if repo.is_repo_clean(): print("Repo currently clean. Ignoring push_to_hub") else: repo.git_add(auto_lfs_track=True) repo.git_commit("Auto commit by space") if FORCE_PUSH == "yes": force_git_push(repo) else: repo.git_push() if not f_stop.is_set(): # call again in 60 seconds threading.Timer(PUSH_FREQUENCY, asynchronous_push, [f_stop]).start() f_stop = threading.Event() asynchronous_push(f_stop) # Now let's run the app! prompt = load_prompt(PROMPT_TEMPLATES / "openai_chatgpt.json") chatbot_1 = ConversationChain( llm=HuggingFaceHub( repo_id="google/flan-t5-xl", model_kwargs={"temperature": 1} ), prompt=prompt, verbose=False, memory=ConversationBufferMemory(ai_prefix="Assistant"), ) chatbot_2 = ConversationChain( llm=HuggingFaceHub( repo_id="bigscience/bloom", model_kwargs={"temperature": 0.7} ), prompt=prompt, verbose=False, memory=ConversationBufferMemory(ai_prefix="Assistant"), ) chatbot_3 = ConversationChain( llm=HuggingFaceHub( repo_id="bigscience/T0_3B", model_kwargs={"temperature": 1} ), prompt=prompt, verbose=False, memory=ConversationBufferMemory(ai_prefix="Assistant"), ) chatbot_4 = ConversationChain( llm=HuggingFaceHub( repo_id="EleutherAI/gpt-j-6B", model_kwargs={"temperature": 1} ), prompt=prompt, verbose=False, memory=ConversationBufferMemory(ai_prefix="Assistant"), ) model_id2model = { "google/flan-t5-xl": chatbot_1, "bigscience/bloom": chatbot_2, "bigscience/T0_3B": chatbot_3, "EleutherAI/gpt-j-6B": chatbot_4 } demo = gr.Blocks() with demo: dummy = gr.Textbox(visible=False) # dummy for passing assignmentId # We keep track of state as a JSON state_dict = { "conversation_id": str(uuid.uuid4()), "assignmentId": "", "cnt": 0, "data": [], "past_user_inputs": [], "generated_responses": [], "response_1": "", "response_2": "", "response_3": "", "response_4": "", } state = gr.JSON(state_dict, visible=False) gr.Markdown("# RLHF Interface") gr.Markdown("Choose the best model output") state_display = gr.Markdown(f"Your messages: 0/{TOTAL_CNT}") # Generate model prediction def _predict(txt, state): # TODO: parallelize this! response_1 = chatbot_1.predict(input=txt) response_2 = chatbot_2.predict(input=txt) response_3 = chatbot_3.predict(input=txt) response_4 = chatbot_4.predict(input=txt) response2model_id = {} response2model_id[response_1] = chatbot_1.llm.repo_id response2model_id[response_2] = chatbot_2.llm.repo_id response2model_id[response_3] = chatbot_3.llm.repo_id response2model_id[response_4] = chatbot_4.llm.repo_id state["cnt"] += 1 new_state_md = f"Inputs remaining in HIT: {state['cnt']}/{TOTAL_CNT}" state["data"].append({"cnt": state["cnt"], "text": txt, "response_1": response_1, "response_2": response_2, "response_3": response_3, "response_4": response_4,"response2model_id": response2model_id}) state["past_user_inputs"].append(txt) past_conversation_string = "
".join(["
".join(["😃: " + user_input, "🤖: " + model_response]) for user_input, model_response in zip(state["past_user_inputs"], state["generated_responses"] + [""])]) return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True, choices=[response_1, response_2, response_3, response_4], interactive=True, value=response_1), gr.update(value=past_conversation_string), state, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), new_state_md, dummy def _select_response(selected_response, state, dummy): done = state["cnt"] == TOTAL_CNT state["generated_responses"].append(selected_response) state["data"][-1]["selected_response"] = selected_response state["data"][-1]["selected_model"] = state["data"][-1]["response2model_id"][selected_response] if state["cnt"] == TOTAL_CNT: # Write the HIT data to our local dataset because the worker has # submitted everything now. with open(DATA_FILE, "a") as jsonlfile: json_data_with_assignment_id =\ [json.dumps(dict({"assignmentId": state["assignmentId"], "conversation_id": state["conversation_id"]}, **datum)) for datum in state["data"]] jsonlfile.write("\n".join(json_data_with_assignment_id) + "\n") toggle_example_submit = gr.update(visible=not done) past_conversation_string = "
".join(["
".join(["😃: " + user_input, "🤖: " + model_response]) for user_input, model_response in zip(state["past_user_inputs"], state["generated_responses"])]) query = parse_qs(dummy[1:]) if "assignmentId" in query and query["assignmentId"][0] != "ASSIGNMENT_ID_NOT_AVAILABLE": # It seems that someone is using this app on mturk. We need to # store the assignmentId in the state before submit_hit_button # is clicked. We can do this here in _predict. We need to save the # assignmentId so that the turker can get credit for their HIT. state["assignmentId"] = query["assignmentId"][0] toggle_final_submit = gr.update(visible=done) toggle_final_submit_preview = gr.update(visible=False) else: toggle_final_submit_preview = gr.update(visible=done) toggle_final_submit = gr.update(visible=False) if done: # Wipe the memory completely because we will be starting a new hit soon. chatbot_1.memory = ConversationBufferMemory(ai_prefix="Assistant") chatbot_2.memory = ConversationBufferMemory(ai_prefix="Assistant") chatbot_3.memory = ConversationBufferMemory(ai_prefix="Assistant") chatbot_4.memory = ConversationBufferMemory(ai_prefix="Assistant") else: # Sync all of the model's memories with the conversation path that # was actually taken. chatbot_1.memory = model_id2model[state["data"][-1]["response2model_id"][selected_response]].memory chatbot_2.memory = model_id2model[state["data"][-1]["response2model_id"][selected_response]].memory chatbot_3.memory = model_id2model[state["data"][-1]["response2model_id"][selected_response]].memory chatbot_4.memory = model_id2model[state["data"][-1]["response2model_id"][selected_response]].memory text_input = gr.update(visible=False) if done else gr.update(visible=True) return gr.update(visible=False), gr.update(visible=True), text_input, gr.update(visible=False), state, gr.update(value=past_conversation_string), toggle_example_submit, toggle_final_submit, toggle_final_submit_preview, # Input fields past_conversation = gr.Markdown() text_input = gr.Textbox(placeholder="Enter a statement", show_label=False) select_response = gr.Radio(choices=[None, None], visible=False, label="Choose the best response") select_response_button = gr.Button("Select Response", visible=False) with gr.Column() as example_submit: submit_ex_button = gr.Button("Submit") with gr.Column(visible=False) as final_submit: submit_hit_button = gr.Button("Submit HIT") with gr.Column(visible=False) as final_submit_preview: submit_hit_button_preview = gr.Button("Submit Work (preview mode; no mturk HIT credit, but your examples will still be stored)") # Button event handlers get_window_location_search_js = """ function(text_input, label_input, state, dummy) { return [text_input, label_input, state, window.location.search]; } """ select_response_button.click( _select_response, inputs=[select_response, state, dummy], outputs=[select_response, example_submit, text_input, select_response_button, state, past_conversation, example_submit, final_submit, final_submit_preview], _js=get_window_location_search_js, ) submit_ex_button.click( _predict, inputs=[text_input, state], outputs=[text_input, select_response_button, select_response, past_conversation, state, example_submit, final_submit, final_submit_preview, state_display, dummy], _js=get_window_location_search_js, ) post_hit_js = """ function(state) { // If there is an assignmentId, then the submitter is on mturk // and has accepted the HIT. So, we need to submit their HIT. const form = document.createElement('form'); form.action = 'https://workersandbox.mturk.com/mturk/externalSubmit'; form.method = 'post'; for (const key in state) { const hiddenField = document.createElement('input'); hiddenField.type = 'hidden'; hiddenField.name = key; hiddenField.value = state[key]; form.appendChild(hiddenField); }; document.body.appendChild(form); form.submit(); return state; } """ submit_hit_button.click( lambda state: state, inputs=[state], outputs=[state], _js=post_hit_js, ) refresh_app_js = """ function(state) { // The following line here loads the app again so the user can // enter in another preview-mode "HIT". window.location.href = window.location.href; return state; } """ submit_hit_button_preview.click( lambda state: state, inputs=[state], outputs=[state], _js=refresh_app_js, ) demo.launch()