Spaces:
Runtime error
Runtime error
""" | |
Chatbot Arena (side-by-side) tab. | |
Users chat with two chosen models. | |
""" | |
import json | |
import time | |
import gradio as gr | |
import numpy as np | |
from fastchat.constants import ( | |
MODERATION_MSG, | |
CONVERSATION_LIMIT_MSG, | |
INPUT_CHAR_LEN_LIMIT, | |
CONVERSATION_TURN_LIMIT, | |
) | |
from fastchat.model.model_adapter import get_conversation_template | |
from fastchat.serve.gradio_web_server import ( | |
ImageState, | |
diffusion_response, | |
bot_response, | |
get_conv_log_filename, | |
no_change_btn, | |
enable_btn, | |
disable_btn, | |
invisible_btn, | |
acknowledgment_md, | |
get_model_description_md, | |
ip_expiration_dict, | |
get_ip, | |
) | |
from fastchat.utils import ( | |
build_logger, | |
moderation_filter, | |
) | |
logger = build_logger("gradio_web_server_multi", "gradio_web_server_multi.log") | |
num_sides = 2 | |
enable_moderation = False | |
def set_global_vars_named(enable_moderation_): | |
global enable_moderation | |
global enable_moderation | |
enable_moderation = enable_moderation_ | |
def load_demo_side_by_side_named(models, url_params): | |
logger.info("load_demo_side_by_side_named") | |
states = (None,) * num_sides | |
model_left = models[0] if len(models) > 0 else "" | |
if len(models) > 1: | |
weights = ([8] * 4 + [4] * 8 + [1] * 32)[: len(models) - 1] | |
weights = weights / np.sum(weights) | |
model_right = np.random.choice(models[1:], p=weights) | |
else: | |
model_right = model_left | |
selector_updates = ( | |
gr.Dropdown.update(choices=models, value=model_left, visible=True), | |
gr.Dropdown.update(choices=models, value=model_right, visible=True), | |
) | |
return states + selector_updates | |
def vote_last_response(states, vote_type, model_selectors, request: gr.Request): | |
with open(get_conv_log_filename(), "a") as fout: | |
data = { | |
"tstamp": round(time.time(), 4), | |
"type": vote_type, | |
"models": [x for x in model_selectors], | |
"states": [x.dict() for x in states], | |
"ip": get_ip(request), | |
} | |
fout.write(json.dumps(data) + "\n") | |
def leftvote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"leftvote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "leftvote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def rightvote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"rightvote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "rightvote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def tievote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"tievote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "tievote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
def bothbad_vote_last_response( | |
state0, state1, model_selector0, model_selector1, request: gr.Request | |
): | |
logger.info(f"bothbad_vote (named). ip: {get_ip(request)}") | |
vote_last_response( | |
[state0, state1], "bothbad_vote", [model_selector0, model_selector1], request | |
) | |
return ("",) + (disable_btn,) * 4 | |
# def regenerate(state0, state1, request: gr.Request): | |
# logger.info(f"regenerate (named). ip: {get_ip(request)}") | |
# states = [state0, state1] | |
# for i in range(num_sides): | |
# states[i].conv.update_last_message(None) | |
# return states + [x.to_gradio_chatbot() for x in states] + [""] + [disable_btn] * 6 | |
def regenerate(state0, state1, request: gr.Request): | |
logger.info(f"regenerate (anony). ip: {get_ip(request)}") | |
states = [state0, state1] | |
for i in range(num_sides): | |
states[i].conv.update_last_message(None) | |
return states + [gr.Image() for x in states] + [""] + [disable_btn] * 6 | |
def clear_history(request: gr.Request): | |
logger.info(f"clear_history (named). ip: {get_ip(request)}") | |
return ( | |
[None] * num_sides | |
+ [gr.Image()] * num_sides | |
+ [""] | |
+ [invisible_btn] * 4 | |
+ [disable_btn] * 2 | |
) | |
def share_click(state0, state1, model_selector0, model_selector1, request: gr.Request): | |
logger.info(f"share (named). ip: {get_ip(request)}") | |
if state0 is not None and state1 is not None: | |
vote_last_response( | |
[state0, state1], "share", [model_selector0, model_selector1], request | |
) | |
# class ImageState: | |
# def __init__(self, model_name): | |
# self.conv = get_conversation_template(model_name) | |
# # self.conv_id = uuid.uuid4().hex | |
# self.skip_next = False | |
# self.model_name = model_name | |
# self.prompt = None | |
# # self.conv = prompt | |
# self.output = None | |
# | |
# # if model_name == "palm-2": | |
# # # According to release note, "chat-bison@001" is PaLM 2 for chat. | |
# # # https://cloud.google.com/vertex-ai/docs/release-notes#May_10_2023 | |
# # self.palm_chat = init_palm_chat("chat-bison@001") | |
# | |
# # def to_gradio_chatbot(self): | |
# # return self.conv.to_gradio_chatbot() | |
# | |
# def dict(self): | |
# base = { | |
# "model_name": self.model_name, | |
# } | |
# return base | |
def add_text( | |
state0, state1, model_selector0, model_selector1, text, request: gr.Request | |
): | |
ip = get_ip(request) | |
logger.info(f"add_text (anony). ip: {ip}. len: {len(text)}") | |
states = [state0, state1] | |
model_selectors = [model_selector0, model_selector1] | |
# Init states if necessary | |
if states[0] is None: | |
assert states[1] is None | |
for i in range(num_sides): | |
if states[i] is None: | |
states[i] = ImageState(model_selectors[i]) | |
if len(text) <= 0: | |
for i in range(num_sides): | |
states[i].skip_next = True | |
return ( | |
states | |
+ [gr.Image() for x in states] | |
+ [""] | |
+ [ | |
no_change_btn, | |
] | |
* 6 | |
) | |
model_list = [states[i].model_name for i in range(num_sides)] | |
flagged = moderation_filter(text, model_list) | |
if flagged: | |
logger.info(f"violate moderation (anony). ip: {ip}. text: {text}") | |
# overwrite the original text | |
text = MODERATION_MSG | |
conv = states[0].conv | |
if (len(conv.messages) - conv.offset) // 2 >= CONVERSATION_TURN_LIMIT: | |
logger.info(f"conversation turn limit. ip: {get_ip(request)}. text: {text}") | |
for i in range(num_sides): | |
states[i].skip_next = True | |
return ( | |
states | |
+ [gr.Image() for x in states] | |
+ [CONVERSATION_LIMIT_MSG] | |
+ [ | |
no_change_btn, | |
] | |
* 6 | |
) | |
text = text[:INPUT_CHAR_LEN_LIMIT] # Hard cut-off | |
for i in range(num_sides): | |
states[i].conv.append_message(states[i].conv.roles[0], text) | |
# states[i].conv.append_message(states[i].conv.roles[1], None) | |
states[i].skip_next = False | |
return ( | |
states | |
+ [gr.Image() for x in states] | |
+ [text] | |
+ [ | |
disable_btn, | |
] | |
* 6 | |
) | |
# def add_text( | |
# state0, state1, model_selector0, model_selector1, text, request: gr.Request | |
# ): | |
# ip = get_ip(request) | |
# logger.info(f"add_text (named). ip: {ip}. len: {len(text)}") | |
# states = [state0, state1] | |
# model_selectors = [model_selector0, model_selector1] | |
# | |
# # Init states if necessary | |
# for i in range(num_sides): | |
# if states[i] is None: | |
# states[i] = State(model_selectors[i]) | |
# | |
# if len(text) <= 0: | |
# for i in range(num_sides): | |
# states[i].skip_next = True | |
# return ( | |
# states | |
# + [x.to_gradio_chatbot() for x in states] | |
# + [""] | |
# + [ | |
# no_change_btn, | |
# ] | |
# * 6 | |
# ) | |
# | |
# model_list = [states[i].model_name for i in range(num_sides)] | |
# flagged = moderation_filter(text, model_list) | |
# if flagged: | |
# logger.info(f"violate moderation (named). ip: {ip}. text: {text}") | |
# # overwrite the original text | |
# text = MODERATION_MSG | |
# | |
# conv = states[0].conv | |
# if (len(conv.messages) - conv.offset) // 2 >= CONVERSATION_TURN_LIMIT: | |
# logger.info(f"conversation turn limit. ip: {ip}. text: {text}") | |
# for i in range(num_sides): | |
# states[i].skip_next = True | |
# return ( | |
# states | |
# + [x.to_gradio_chatbot() for x in states] | |
# + [CONVERSATION_LIMIT_MSG] | |
# + [ | |
# no_change_btn, | |
# ] | |
# * 6 | |
# ) | |
# | |
# text = text[:INPUT_CHAR_LEN_LIMIT] # Hard cut-off | |
# for i in range(num_sides): | |
# states[i].conv.append_message(states[i].conv.roles[0], text) | |
# states[i].conv.append_message(states[i].conv.roles[1], None) | |
# states[i].skip_next = False | |
# | |
# return ( | |
# states | |
# + [x.to_gradio_chatbot() for x in states] | |
# + [""] | |
# + [ | |
# disable_btn, | |
# ] | |
# * 6 | |
# ) | |
def diffusion_response_multi( | |
state0, | |
state1, | |
request: gr.Request, | |
): | |
logger.info(f"bot_response_multi (anony). ip: {get_ip(request)}") | |
if state0 is None or state0.skip_next: | |
# This generate call is skipped due to invalid inputs | |
yield ( | |
state0, | |
state1, | |
gr.Image(), | |
gr.Image(), | |
) + (no_change_btn,) * 6 | |
return | |
states = [state0, state1] | |
gen = [] | |
for i in range(num_sides): | |
gen.append( | |
diffusion_response( | |
states[i], | |
request, | |
) | |
) | |
chatbots = [None] * num_sides | |
while True: | |
stop = True | |
for i in range(num_sides): | |
try: | |
ret = next(gen[i]) | |
states[i], chatbots[i] = ret[0], ret[1] | |
stop = False | |
except StopIteration: | |
pass | |
yield states + chatbots + [disable_btn] * 6 | |
if stop: | |
break | |
# def bot_response_multi( | |
# state0, | |
# state1, | |
# temperature, | |
# top_p, | |
# max_new_tokens, | |
# request: gr.Request, | |
# ): | |
# logger.info(f"bot_response_multi (named). ip: {get_ip(request)}") | |
# | |
# if state0.skip_next: | |
# # This generate call is skipped due to invalid inputs | |
# yield ( | |
# state0, | |
# state1, | |
# state0.to_gradio_chatbot(), | |
# state1.to_gradio_chatbot(), | |
# ) + (no_change_btn,) * 6 | |
# return | |
# | |
# states = [state0, state1] | |
# gen = [] | |
# for i in range(num_sides): | |
# gen.append( | |
# bot_response( | |
# states[i], | |
# temperature, | |
# top_p, | |
# max_new_tokens, | |
# request, | |
# ) | |
# ) | |
# | |
# chatbots = [None] * num_sides | |
# while True: | |
# stop = True | |
# for i in range(num_sides): | |
# try: | |
# ret = next(gen[i]) | |
# states[i], chatbots[i] = ret[0], ret[1] | |
# stop = False | |
# except StopIteration: | |
# pass | |
# yield states + chatbots + [disable_btn] * 6 | |
# if stop: | |
# break | |
def flash_buttons(): | |
btn_updates = [ | |
[disable_btn] * 4 + [enable_btn] * 2, | |
[enable_btn] * 6, | |
] | |
for i in range(4): | |
yield btn_updates[i % 2] | |
time.sleep(0.5) | |
def build_side_by_side_ui_named(models): | |
logger.info("build_side_by_side_ui_named") | |
notice_markdown = """ | |
# βοΈ ImagenHub Arena βοΈ : Standardizing the evaluation of conditional image generation models | |
| [GitHub](https://github.com/TIGER-AI-Lab/ImagenHub) | [Paper](https://arxiv.org/abs/2310.01596) | [Dataset](https://huggingface.co/ImagenHub) | [Twitter](https://twitter.com/???) | [Discord](https://discord.gg/???) | | |
## π Rules | |
- Chat with any two models side-by-side and vote! | |
- You can continue chatting for multiple rounds. | |
- Click "Clear history" to start a new round. | |
## π€ Choose two models to compare | |
""" | |
states = [gr.State() for _ in range(num_sides)] | |
model_selectors = [None] * num_sides | |
chatbots = [None] * num_sides | |
notice = gr.Markdown(notice_markdown, elem_id="notice_markdown") | |
with gr.Group(elem_id="share-region-named"): | |
with gr.Row(): | |
for i in range(num_sides): | |
with gr.Column(): | |
model_selectors[i] = gr.Dropdown( | |
choices=models, | |
value=models[i] if len(models) > i else "", | |
interactive=True, | |
show_label=False, | |
container=False, | |
) | |
with gr.Row(): | |
with gr.Accordion("π Expand to see 20+ model descriptions", open=False): | |
model_description_md = get_model_description_md(models) | |
gr.Markdown(model_description_md, elem_id="model_description_markdown") | |
with gr.Row(): | |
for i in range(num_sides): | |
label = "Model A" if i == 0 else "Model B" | |
with gr.Column(): | |
chatbots[i] = gr.Image( | |
# label=label, elem_id=f"chatbot", height=550 | |
) | |
with gr.Row(): | |
leftvote_btn = gr.Button( | |
value="π A is better", visible=False, interactive=False | |
) | |
rightvote_btn = gr.Button( | |
value="π B is better", visible=False, interactive=False | |
) | |
tie_btn = gr.Button(value="π€ Tie", visible=False, interactive=False) | |
bothbad_btn = gr.Button( | |
value="π Both are bad", visible=False, interactive=False | |
) | |
with gr.Row(): | |
textbox = gr.Textbox( | |
show_label=False, | |
placeholder="π Enter your prompt and press ENTER", | |
elem_id="input_box", | |
) | |
send_btn = gr.Button(value="Send", variant="primary", scale=0) | |
with gr.Row() as button_row: | |
clear_btn = gr.Button(value="ποΈ Clear history", interactive=False) | |
regenerate_btn = gr.Button(value="π Regenerate", interactive=False) | |
share_btn = gr.Button(value="π· Share") | |
# with gr.Accordion("Parameters", open=False) as parameter_row: | |
# temperature = gr.Slider( | |
# minimum=0.0, | |
# maximum=1.0, | |
# value=0.7, | |
# step=0.1, | |
# interactive=True, | |
# label="Temperature", | |
# ) | |
# top_p = gr.Slider( | |
# minimum=0.0, | |
# maximum=1.0, | |
# value=1.0, | |
# step=0.1, | |
# interactive=True, | |
# label="Top P", | |
# ) | |
# max_output_tokens = gr.Slider( | |
# minimum=16, | |
# maximum=1024, | |
# value=512, | |
# step=64, | |
# interactive=True, | |
# label="Max output tokens", | |
# ) | |
gr.Markdown(acknowledgment_md, elem_id="ack_markdown") | |
# Register listeners | |
btn_list = [ | |
leftvote_btn, | |
rightvote_btn, | |
tie_btn, | |
bothbad_btn, | |
regenerate_btn, | |
clear_btn, | |
] | |
leftvote_btn.click( | |
leftvote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
rightvote_btn.click( | |
rightvote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
tie_btn.click( | |
tievote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
bothbad_btn.click( | |
bothbad_vote_last_response, | |
states + model_selectors, | |
[textbox, leftvote_btn, rightvote_btn, tie_btn, bothbad_btn], | |
) | |
regenerate_btn.click( | |
regenerate, states, states + chatbots + [textbox] + btn_list | |
).then( | |
diffusion_response_multi, | |
states, | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
clear_btn.click(clear_history, None, states + chatbots + [textbox] + btn_list) | |
share_js = """ | |
function (a, b, c, d) { | |
const captureElement = document.querySelector('#share-region-named'); | |
html2canvas(captureElement) | |
.then(canvas => { | |
canvas.style.display = 'none' | |
document.body.appendChild(canvas) | |
return canvas | |
}) | |
.then(canvas => { | |
const image = canvas.toDataURL('image/png') | |
const a = document.createElement('a') | |
a.setAttribute('download', 'chatbot-arena.png') | |
a.setAttribute('href', image) | |
a.click() | |
canvas.remove() | |
}); | |
return [a, b, c, d]; | |
} | |
""" | |
share_btn.click(share_click, states + model_selectors, [], _js=share_js) | |
for i in range(num_sides): | |
model_selectors[i].change( | |
clear_history, None, states + chatbots + [textbox] + btn_list | |
) | |
textbox.submit( | |
add_text, | |
states + model_selectors + [textbox], | |
states + chatbots + [textbox] + btn_list, | |
).then( | |
diffusion_response_multi, | |
states, | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
send_btn.click( | |
add_text, | |
states + model_selectors + [textbox], | |
states + chatbots + [textbox] + btn_list, | |
).then( | |
diffusion_response_multi, | |
states, | |
states + chatbots + btn_list, | |
).then( | |
flash_buttons, [], btn_list | |
) | |
return states + model_selectors | |