""" Chatbot Arena (battle) tab. Users chat with two anonymous models. """ import json import time import gradio as gr import numpy as np from fastchat.constants import ( MODERATION_MSG, CONVERSATION_LIMIT_MSG, SLOW_MODEL_MSG, INPUT_CHAR_LEN_LIMIT, CONVERSATION_TURN_LIMIT, ) from fastchat.model.model_adapter import get_conversation_template from fastchat.serve.gradio_block_arena_named import flash_buttons from fastchat.serve.gradio_web_server import ( ImageState, bot_response, diffusion_response, get_conv_log_filename, no_change_btn, enable_btn, disable_btn, invisible_btn, acknowledgment_md, ip_expiration_dict, get_ip, get_model_description_md, ) from fastchat.utils import ( build_logger, moderation_filter, ) logger = build_logger("gradio_web_server_multi", "gradio_web_server_multi.log") num_sides = 2 enable_moderation = False anony_names = ["", ""] models = [] def set_global_vars_anony(enable_moderation_): global enable_moderation enable_moderation = enable_moderation_ def load_demo_side_by_side_anony(models_, url_params): logger.info("load_demo_side_by_side_anony") global models models = models_ states = (None,) * num_sides selector_updates = ( gr.Markdown.update(visible=True), gr.Markdown.update(visible=True), ) return states + selector_updates def vote_last_response(states, vote_type, model_selectors, request: gr.Request): with open(get_conv_log_filename(), "a") as fout: data = { "tstamp": round(time.time(), 4), "type": vote_type, "models": [x for x in model_selectors], "states": [x.dict() for x in states], "ip": get_ip(request), } fout.write(json.dumps(data) + "\n") if ":" not in model_selectors[0]: for i in range(15): names = ( "### Model A: " + states[0].model_name, "### Model B: " + states[1].model_name, ) yield names + ("",) + (disable_btn,) * 4 time.sleep(0.2) else: names = ( "### Model A: " + states[0].model_name, "### Model B: " + states[1].model_name, ) yield names + ("",) + (disable_btn,) * 4 def leftvote_last_response( state0, state1, model_selector0, model_selector1, request: gr.Request ): logger.info(f"leftvote (anony). ip: {get_ip(request)}") for x in vote_last_response( [state0, state1], "leftvote", [model_selector0, model_selector1], request ): yield x def rightvote_last_response( state0, state1, model_selector0, model_selector1, request: gr.Request ): logger.info(f"rightvote (anony). ip: {get_ip(request)}") for x in vote_last_response( [state0, state1], "rightvote", [model_selector0, model_selector1], request ): yield x def tievote_last_response( state0, state1, model_selector0, model_selector1, request: gr.Request ): logger.info(f"tievote (anony). ip: {get_ip(request)}") for x in vote_last_response( [state0, state1], "tievote", [model_selector0, model_selector1], request ): yield x def bothbad_vote_last_response( state0, state1, model_selector0, model_selector1, request: gr.Request ): logger.info(f"bothbad_vote (anony). ip: {get_ip(request)}") for x in vote_last_response( [state0, state1], "bothbad_vote", [model_selector0, model_selector1], request ): yield x # def regenerate(state0, state1, request: gr.Request): # logger.info(f"regenerate (anony). ip: {get_ip(request)}") # states = [state0, state1] # for i in range(num_sides): # states[i].conv.update_last_message(None) # return states + [x.to_gradio_chatbot() for x in states] + [""] + [disable_btn] * 6 def regenerate(state0, state1, request: gr.Request): logger.info(f"regenerate (anony). ip: {get_ip(request)}") states = [state0, state1] for i in range(num_sides): states[i].conv.update_last_message(None) return states + [gr.Image() for x in states] + [""] + [disable_btn] * 6 def clear_history(request: gr.Request): logger.info(f"clear_history (anony). ip: {get_ip(request)}") return ( [None] * num_sides + [gr.Image()] * num_sides + anony_names + [""] + [invisible_btn] * 4 + [disable_btn] * 2 + [""] ) def share_click(state0, state1, model_selector0, model_selector1, request: gr.Request): logger.info(f"share (anony). ip: {get_ip(request)}") if state0 is not None and state1 is not None: vote_last_response( [state0, state1], "share", [model_selector0, model_selector1], request ) SAMPLING_WEIGHTS = { # tier 0 "stable-diffusion-v1-4": 4, "stable-diffusion-v1-5": 4, "imagenhub_dreambooth": 4, "gpt-4": 4, "gpt-4-turbo": 4, "gpt-3.5-turbo": 2, "gpt-3.5-turbo-1106": 2, "claude-2.1": 4, "claude-2.0": 2, "claude-1": 2, "claude-instant-1": 4, "openhermes-2.5-mistral-7b": 2, "wizardlm-70b": 2, "starling-lm-7b-alpha": 2, "tulu-2-dpo-70b": 2, "yi-34b-chat": 2, "zephyr-7b-beta": 2, "openchat-3.5": 2, "chatglm3-6b": 2, # tier 1 "deluxe-chat-v1.1": 4, "palm-2": 1.5, "llama-2-70b-chat": 1.5, "llama-2-13b-chat": 1.5, "codellama-34b-instruct": 1.5, "vicuna-33b": 4, "vicuna-13b": 1.5, "wizardlm-13b": 1.5, "qwen-14b-chat": 1.5, "mistral-7b-instruct": 1.5, # tier 2 "vicuna-7b": 1.0, "llama-2-7b-chat": 1.0, "chatglm2-6b": 1.0, # deprecated "zephyr-7b-alpha": 1.5, "codellama-13b-instruct": 1.0, "mpt-30b-chat": 1.5, "guanaco-33b": 1.0, "fastchat-t5-3b": 0.5, "alpaca-13b": 0.5, "mpt-7b-chat": 0.1, "oasst-pythia-12b": 0.1, "RWKV-4-Raven-14B": 0.1, "gpt4all-13b-snoozy": 0.1, "koala-13b": 0.1, "stablelm-tuned-alpha-7b": 0.1, "dolly-v2-12b": 0.1, "llama-13b": 0.1, "chatglm-6b": 0.5, "deluxe-chat-v1": 4, } # target model sampling weights will be boosted. BATTLE_TARGETS = { "imagenhub": {"imagenhub_dreambooth"}, "stable-diffusion": {"stable-diffusion-v1-4", "stable-diffusion-v1-5"}, "gpt-4": {"claude-2.1", "gpt-4-turbo"}, "gpt-4-turbo": {"gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-1106", "claude-2.1"}, "gpt-3.5-turbo": {"claude-instant-1", "gpt-4", "claude-2.1"}, "gpt-3.5-turbo-1106": {"claude-instant-1", "gpt-3.5-turbo"}, "claude-2.1": {"gpt-4-turbo", "gpt-4", "claude-1"}, "claude-2.0": {"gpt-4-turbo", "gpt-4", "claude-1"}, "claude-1": {"claude-2.1", "gpt-4", "gpt-3.5-turbo"}, "claude-instant-1": {"gpt-3.5-turbo-1106", "claude-2.1"}, "deluxe-chat-v1.1": {"gpt-4", "gpt-4-turbo"}, "openhermes-2.5-mistral-7b": {"gpt-3.5-turbo", "openchat-3.5", "zephyr-7b-beta"}, "starling-lm-7b-alpha": {"gpt-3.5-turbo", "openchat-3.5", "zephyr-7b-beta"}, "tulu-2-dpo-70b": {"gpt-3.5-turbo", "vicuna-33b", "claude-instant-1"}, "yi-34b-chat": {"gpt-3.5-turbo", "vicuna-33b", "claude-instant-1"}, "openchat-3.5": {"gpt-3.5-turbo", "llama-2-70b-chat", "zephyr-7b-beta"}, "chatglm3-6b": {"yi-34b-chat", "qwen-14b-chat"}, "qwen-14b-chat": {"vicuna-13b", "llama-2-13b-chat", "llama-2-70b-chat"}, "zephyr-7b-alpha": {"mistral-7b-instruct", "llama-2-13b-chat"}, "zephyr-7b-beta": { "mistral-7b-instruct", "llama-2-13b-chat", "llama-2-7b-chat", "wizardlm-13b", }, "llama-2-70b-chat": {"gpt-3.5-turbo", "vicuna-33b", "claude-instant-1"}, "llama-2-13b-chat": {"mistral-7b-instruct", "vicuna-13b", "llama-2-70b-chat"}, "llama-2-7b-chat": {"mistral-7b-instruct", "vicuna-7b", "llama-2-13b-chat"}, "mistral-7b-instruct": { "llama-2-7b-chat", "llama-2-13b-chat", "llama-2-70b-chat", }, "vicuna-33b": {"llama-2-70b-chat", "gpt-3.5-turbo", "claude-instant-1"}, "vicuna-13b": {"llama-2-13b-chat", "llama-2-70b-chat"}, "vicuna-7b": {"llama-2-7b-chat", "mistral-7b-instruct", "llama-2-13b-chat"}, "wizardlm-70b": {"gpt-3.5-turbo", "vicuna-33b", "claude-instant-1"}, "palm-2": {"llama-2-13b-chat", "gpt-3.5-turbo"}, } SAMPLING_BOOST_MODELS = [ "tulu-2-dpo-70b", "yi-34b-chat", "claude-2.1", "wizardlm-70b", "starling-lm-7b-alpha", "openhermes-2.5-mistral-7b", "gpt-3.5-turbo-1106", # "openchat-3.5", # "gpt-4-turbo", # "claude-1", ] # outage models won't be sampled. OUTAGE_MODELS = [ "zephyr-7b-alpha", "falcon-180b-chat", ] def get_sample_weight(model): if model in OUTAGE_MODELS: return 0 weight = SAMPLING_WEIGHTS.get(model, 1.0) if model in SAMPLING_BOOST_MODELS: weight *= 5 return weight def get_battle_pair(): if len(models) == 1: return models[0], models[0] model_weights = [] for model in models: weight = get_sample_weight(model) model_weights.append(weight) total_weight = np.sum(model_weights) model_weights = model_weights / total_weight chosen_idx = np.random.choice(len(models), p=model_weights) chosen_model = models[chosen_idx] rival_models = [] rival_weights = [] for model in models: if model == chosen_model: continue weight = get_sample_weight(model) if ( weight != 0 and chosen_model in BATTLE_TARGETS and model in BATTLE_TARGETS[chosen_model] ): # boost to 50% chance weight = total_weight / len(BATTLE_TARGETS[chosen_model]) rival_models.append(model) rival_weights.append(weight) # for p, w in zip(rival_models, rival_weights): # print(p, w) rival_weights = rival_weights / np.sum(rival_weights) rival_idx = np.random.choice(len(rival_models), p=rival_weights) rival_model = rival_models[rival_idx] swap = np.random.randint(2) if swap == 0: return chosen_model, rival_model else: return rival_model, chosen_model # def add_text( # state0, state1, model_selector0, model_selector1, text, request: gr.Request # ): # ip = get_ip(request) # logger.info(f"add_text (anony). ip: {ip}. len: {len(text)}") # states = [state0, state1] # model_selectors = [model_selector0, model_selector1] # # # Init states if necessary # if states[0] is None: # assert states[1] is None # # model_left, model_right = get_battle_pair() # states = [ # State(model_left), # State(model_right), # ] # # if len(text) <= 0: # for i in range(num_sides): # states[i].skip_next = True # return ( # states # + [x.to_gradio_chatbot() for x in states] # + [""] # + [ # no_change_btn, # ] # * 6 # + [""] # ) # # model_list = [states[i].model_name for i in range(num_sides)] # flagged = moderation_filter(text, model_list) # if flagged: # logger.info(f"violate moderation (anony). ip: {ip}. text: {text}") # # overwrite the original text # text = MODERATION_MSG # # conv = states[0].conv # if (len(conv.messages) - conv.offset) // 2 >= CONVERSATION_TURN_LIMIT: # logger.info(f"conversation turn limit. ip: {get_ip(request)}. text: {text}") # for i in range(num_sides): # states[i].skip_next = True # return ( # states # + [x.to_gradio_chatbot() for x in states] # + [CONVERSATION_LIMIT_MSG] # + [ # no_change_btn, # ] # * 6 # + [""] # ) # # text = text[:INPUT_CHAR_LEN_LIMIT] # Hard cut-off # for i in range(num_sides): # states[i].conv.append_message(states[i].conv.roles[0], text) # states[i].conv.append_message(states[i].conv.roles[1], None) # states[i].skip_next = False # # slow_model_msg = "" # for i in range(num_sides): # if "deluxe" in states[i].model_name: # slow_model_msg = SLOW_MODEL_MSG # return ( # states # + [x.to_gradio_chatbot() for x in states] # + [""] # + [ # disable_btn, # ] # * 6 # + [slow_model_msg] # ) # class ImageState: # def __init__(self, model_name): # self.conv = get_conversation_template(model_name) # # self.conv_id = uuid.uuid4().hex # self.skip_next = False # self.model_name = model_name # self.prompt = None # # self.conv = prompt # self.output = None # # # if model_name == "palm-2": # # # According to release note, "chat-bison@001" is PaLM 2 for chat. # # # https://cloud.google.com/vertex-ai/docs/release-notes#May_10_2023 # # self.palm_chat = init_palm_chat("chat-bison@001") # # # def to_gradio_chatbot(self): # # return self.conv.to_gradio_chatbot() # # def dict(self): # base = { # "model_name": self.model_name, # } # return base def add_text( state0, state1, model_selector0, model_selector1, text, request: gr.Request ): ip = get_ip(request) logger.info(f"add_text (anony). ip: {ip}. len: {len(text)}") states = [state0, state1] model_selectors = [model_selector0, model_selector1] # Init states if necessary if states[0] is None: assert states[1] is None model_left, model_right = get_battle_pair() states = [ ImageState(model_left), ImageState(model_right), ] if len(text) <= 0: for i in range(num_sides): states[i].skip_next = True return ( states + [gr.Image() for x in states] + [""] + [ no_change_btn, ] * 6 + [""] ) model_list = [states[i].model_name for i in range(num_sides)] flagged = moderation_filter(text, model_list) if flagged: logger.info(f"violate moderation (anony). ip: {ip}. text: {text}") # overwrite the original text text = MODERATION_MSG conv = states[0].conv if (len(conv.messages) - conv.offset) // 2 >= CONVERSATION_TURN_LIMIT: logger.info(f"conversation turn limit. ip: {get_ip(request)}. text: {text}") for i in range(num_sides): states[i].skip_next = True return ( states + [gr.Image() for x in states] + [CONVERSATION_LIMIT_MSG] + [ no_change_btn, ] * 6 + [""] ) text = text[:INPUT_CHAR_LEN_LIMIT] # Hard cut-off for i in range(num_sides): states[i].conv.append_message(states[i].conv.roles[0], text) # states[i].conv.append_message(states[i].conv.roles[1], None) states[i].skip_next = False slow_model_msg = "" for i in range(num_sides): if "deluxe" in states[i].model_name: slow_model_msg = SLOW_MODEL_MSG return ( states + [gr.Image() for x in states] + [text] + [ disable_btn, ] * 6 + [slow_model_msg] ) def diffusion_response_multi( state0, state1, request: gr.Request, ): logger.info(f"bot_response_multi (anony). ip: {get_ip(request)}") if state0 is None or state0.skip_next: # This generate call is skipped due to invalid inputs yield ( state0, state1, gr.Image(), gr.Image(), ) + (no_change_btn,) * 6 return states = [state0, state1] gen = [] for i in range(num_sides): gen.append( diffusion_response( states[i], request, ) ) chatbots = [None] * num_sides while True: stop = True for i in range(num_sides): try: ret = next(gen[i]) states[i], chatbots[i] = ret[0], ret[1] stop = False except StopIteration: pass yield states + chatbots + [disable_btn] * 6 if stop: break # def bot_response_multi( # state0, # state1, # temperature, # top_p, # max_new_tokens, # request: gr.Request, # ): # logger.info(f"bot_response_multi (anony). ip: {get_ip(request)}") # # if state0 is None or state0.skip_next: # # This generate call is skipped due to invalid inputs # yield ( # state0, # state1, # state0.to_gradio_chatbot(), # state1.to_gradio_chatbot(), # ) + (no_change_btn,) * 6 # return # # states = [state0, state1] # gen = [] # for i in range(num_sides): # gen.append( # bot_response( # states[i], # temperature, # top_p, # max_new_tokens, # request, # ) # ) # # chatbots = [None] * num_sides # while True: # stop = True # for i in range(num_sides): # try: # ret = next(gen[i]) # states[i], chatbots[i] = ret[0], ret[1] # stop = False # except StopIteration: # pass # yield states + chatbots + [disable_btn] * 6 # if stop: # break def build_side_by_side_ui_anony(models): logger.info("build_side_by_side_ui_anony") notice_markdown = """ # βš”οΈ ImagenHub Arena βš”οΈ : Standardizing the evaluation of conditional image generation models | [GitHub](https://github.com/TIGER-AI-Lab/ImagenHub) | [Paper](https://arxiv.org/abs/2310.01596) | [Dataset](https://huggingface.co/ImagenHub) | [Twitter](https://twitter.com/???) | [Discord](https://discord.gg/???) | ## πŸ“œ Rules - Ask any question to two anonymous models in same area (e.g., Dalle-2, Stable Diffusion XL in Text-guided Image Generation Model, MagicBrush, InstructPix2Pix in Text-guided Image Editing Model) and vote for the better one! - You can continue chatting until you identify a winner. - Vote won't be counted if model identity is revealed during conversation. ## πŸ† Arena Elo We introduce ImagenHub, a one-stop library to standardize the inference and evaluation of all the conditional image generation models. Find out who is the πŸ₯‡conditional image generation models! ## πŸ‘‡ Generation now! """ # [Leaderboard](https://???) states = [gr.State() for _ in range(num_sides)] model_selectors = [None] * num_sides chatbots = [None] * num_sides gr.Markdown(notice_markdown, elem_id="notice_markdown") with gr.Group(elem_id="share-region-anony"): with gr.Accordion("πŸ” Expand to see 20+ Arena players", open=False): model_description_md = get_model_description_md(models) gr.Markdown(model_description_md, elem_id="model_description_markdown") with gr.Row(): for i in range(num_sides): label = "Model A" if i == 0 else "Model B" with gr.Column(): chatbots[i] = gr.Image( # label=label, elem_id=f"chatbot", height=550 ) # chatbots[i] = gr.Image( # label=label # ) with gr.Row(): for i in range(num_sides): with gr.Column(): model_selectors[i] = gr.Markdown(anony_names[i]) with gr.Row(): slow_warning = gr.Markdown("", elem_id="notice_markdown") with gr.Row(): leftvote_btn = gr.Button( value="πŸ‘ˆ A is better", visible=False, interactive=False ) rightvote_btn = gr.Button( value="πŸ‘‰ B is better", visible=False, interactive=False ) tie_btn = gr.Button(value="🀝 Tie", visible=False, interactive=False) bothbad_btn = gr.Button( value="πŸ‘Ž Both are bad", visible=False, interactive=False ) with gr.Row(): textbox = gr.Textbox( show_label=False, placeholder="πŸ‘‰ Enter your prompt and press ENTER", container=True, elem_id="input_box", ) send_btn = gr.Button(value="Send", variant="primary", scale=0) with gr.Row() as button_row: clear_btn = gr.Button(value="🎲 New Round", interactive=False) regenerate_btn = gr.Button(value="πŸ”„ Regenerate", interactive=False) share_btn = gr.Button(value="πŸ“· Share") # with gr.Accordion("Parameters", open=False) as parameter_row: # temperature = gr.Slider( # minimum=0.0, # maximum=1.0, # value=0.7, # step=0.1, # interactive=True, # label="Temperature", # ) # top_p = gr.Slider( # minimum=0.0, # maximum=1.0, # value=1.0, # step=0.1, # interactive=True, # label="Top P", # ) # max_output_tokens = gr.Slider( # minimum=16, # maximum=1024, # value=512, # step=64, # interactive=True, # label="Max output tokens", # ) gr.Markdown(acknowledgment_md, elem_id="ack_markdown") # Register listeners btn_list = [ leftvote_btn, rightvote_btn, tie_btn, bothbad_btn, regenerate_btn, clear_btn, ] leftvote_btn.click( leftvote_last_response, states + model_selectors, model_selectors + [textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], ) rightvote_btn.click( rightvote_last_response, states + model_selectors, model_selectors + [textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], ) tie_btn.click( tievote_last_response, states + model_selectors, model_selectors + [textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], ) bothbad_btn.click( bothbad_vote_last_response, states + model_selectors, model_selectors + [textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], ) regenerate_btn.click( regenerate, states, states + chatbots + [textbox] + btn_list ).then( diffusion_response_multi, states, states + chatbots + btn_list, ).then( flash_buttons, [], btn_list ) clear_btn.click( clear_history, None, states + chatbots + model_selectors + [textbox] + btn_list + [slow_warning], ) share_js = """ function (a, b, c, d) { const captureElement = document.querySelector('#share-region-anony'); html2canvas(captureElement) .then(canvas => { canvas.style.display = 'none' document.body.appendChild(canvas) return canvas }) .then(canvas => { const image = canvas.toDataURL('image/png') const a = document.createElement('a') a.setAttribute('download', 'chatbot-arena.png') a.setAttribute('href', image) a.click() canvas.remove() }); return [a, b, c, d]; } """ share_btn.click(share_click, states + model_selectors, [], _js=share_js) textbox.submit( add_text, states + model_selectors + [textbox], states + chatbots + [textbox] + btn_list + [slow_warning], ).then( diffusion_response_multi, states, states + chatbots + btn_list, ).then( flash_buttons, [], btn_list, ) send_btn.click( add_text, states + model_selectors + [textbox], states + chatbots + [textbox] + btn_list, ).then( diffusion_response_multi, states, states + chatbots + btn_list, ).then( flash_buttons, [], btn_list ) return states + model_selectors