Spaces:
Running
Running
from datetime import datetime | |
import json | |
import gradio as gr | |
import re | |
import random | |
from collections import defaultdict | |
import pandas as pd | |
import os | |
from gen_api_answer import get_model_response, parse_model_response | |
from common import * | |
# Model and ELO score data | |
DEFAULT_ELO = 1500 # Starting ELO for new models | |
K_FACTOR = 32 # Standard chess K-factor, adjust as needed | |
elo_scores = defaultdict(lambda: DEFAULT_ELO) | |
vote_counts = defaultdict(int) | |
# Load the model_data from JSONL | |
def load_model_data(): | |
model_data = {} | |
try: | |
with open('data/models.jsonl', 'r') as f: | |
for line in f: | |
model = json.loads(line) | |
model_data[model['name']] = { | |
'organization': model['organization'], | |
'license': model['license'], | |
'api_model': model['api_model'] | |
} | |
except FileNotFoundError: | |
print("Warning: models.jsonl not found") | |
return {} | |
return model_data | |
model_data = load_model_data() | |
current_session_id = 0 | |
voting_data = [] | |
def get_new_session_id(): | |
global current_session_id | |
current_session_id += 1 | |
return f"user{current_session_id}" | |
def store_vote_data(prompt, response_a, response_b, model_a, model_b, winner, judge_id): | |
vote_entry = { | |
"timestamp": datetime.now().isoformat(), | |
"prompt": prompt, | |
"response_a": response_a, | |
"response_b": response_b, | |
"model_a": model_a, | |
"model_b": model_b, | |
"winner": winner, | |
"judge_id": judge_id, | |
} | |
voting_data.append(vote_entry) | |
# Save to file after each vote | |
with open('voting_data.json', 'w') as f: | |
json.dump(voting_data, f, indent=2) | |
def parse_variables(prompt): | |
# Extract variables enclosed in double curly braces | |
variables = re.findall(r'{{(.*?)}}', prompt) | |
# Remove duplicates while preserving order | |
seen = set() | |
variables = [x.strip() for x in variables if not (x.strip() in seen or seen.add(x.strip()))] | |
return variables | |
def get_final_prompt(eval_prompt, variable_values): | |
# Replace variables in the eval prompt with their values | |
for var, val in variable_values.items(): | |
eval_prompt = eval_prompt.replace('{{' + var + '}}', val) | |
return eval_prompt | |
def submit_prompt(eval_prompt, *variable_values): | |
try: | |
variables = parse_variables(eval_prompt) | |
variable_values_dict = {var: val for var, val in zip(variables, variable_values)} | |
final_prompt = get_final_prompt(eval_prompt, variable_values_dict) | |
models = list(model_data.keys()) | |
model1, model2 = random.sample(models, 2) | |
model_a, model_b = (model1, model2) if random.random() < 0.5 else (model2, model1) | |
response_a = get_model_response(model_a, model_data.get(model_a), final_prompt) | |
response_b = get_model_response(model_b, model_data.get(model_b), final_prompt) | |
return ( | |
response_a, | |
response_b, | |
gr.update(visible=True), | |
gr.update(visible=True), | |
model_a, | |
model_b | |
) | |
except Exception as e: | |
print(f"Error in submit_prompt: {str(e)}") | |
return ( | |
"Error generating response", | |
"Error generating response", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
None, | |
None | |
) | |
def vote(choice, model_a, model_b, prompt, response_a, response_b, judge_id): | |
# Update ELO scores based on user choice | |
elo_a = elo_scores[model_a] | |
elo_b = elo_scores[model_b] | |
# Calculate expected scores | |
Ea = 1 / (1 + 10 ** ((elo_b - elo_a) / 400)) | |
Eb = 1 / (1 + 10 ** ((elo_a - elo_b) / 400)) | |
# Assign actual scores | |
if choice == 'A': | |
Sa, Sb = 1, 0 | |
elif choice == 'B': | |
Sa, Sb = 0, 1 | |
else: | |
Sa, Sb = 0.5, 0.5 | |
# Update scores and vote counts | |
elo_scores[model_a] += K_FACTOR * (Sa - Ea) | |
elo_scores[model_b] += K_FACTOR * (Sb - Eb) | |
vote_counts[model_a] += 1 | |
vote_counts[model_b] += 1 | |
# Store the vote data | |
store_vote_data(prompt, response_a, response_b, model_a, model_b, choice, judge_id) | |
# Return updates for UI components | |
return { | |
action_buttons_row: gr.update(visible=False), | |
model_name_a: gr.update(value=f"*Model: {model_a}*"), | |
model_name_b: gr.update(value=f"*Model: {model_b}*"), | |
send_btn: gr.update(interactive=True), | |
regenerate_button: gr.update(visible=True, interactive=True) | |
} | |
def get_leaderboard(): | |
# Generate leaderboard data | |
leaderboard = [] | |
for model, elo in elo_scores.items(): | |
votes = vote_counts[model] | |
ci = 1.96 * (400 / (votes + 1) ** 0.5) # Approximate 95% confidence interval | |
data = { | |
'Model': model, | |
'ELO Score': f"{elo:.2f}", | |
'95% CI': f"±{ci:.2f}", | |
'# Votes': votes, | |
'Organization': model_data[model]['organization'], | |
'License': model_data[model]['license'], | |
} | |
leaderboard.append(data) | |
# Sort by ELO score | |
leaderboard.sort(key=lambda x: float(x['ELO Score']), reverse=True) | |
return leaderboard | |
def regenerate_prompt(model_a, model_b, eval_prompt, *variable_values): | |
variables = parse_variables(eval_prompt) | |
variable_values_dict = {var: val for var, val in zip(variables, variable_values)} | |
final_prompt = get_final_prompt(eval_prompt, variable_values_dict) | |
# Get available models excluding the previous ones | |
available_models = [m for m in model_data.keys() if m not in (model_a, model_b)] | |
# If we have enough models for new pairs | |
if len(available_models) >= 2: | |
model1, model2 = random.sample(available_models, 2) | |
else: | |
# Fallback to allowing previous models if necessary | |
model1, model2 = random.sample(list(model_data.keys()), 2) | |
response_a = get_model_response(model1, model_data.get(model1), final_prompt) | |
response_b = get_model_response(model2, model_data.get(model2), final_prompt) | |
# Parse the responses | |
score_a, critique_a = parse_model_response(response_a) | |
score_b, critique_b = parse_model_response(response_b) | |
return ( | |
score_a, # score_a textbox | |
critique_a, # critique_a textbox | |
score_b, # score_b textbox | |
critique_b, # critique_b textbox | |
gr.update(visible=True), # action_buttons_row | |
gr.update(value="*Model: Unknown*"), # model_name_a | |
gr.update(value="*Model: Unknown*"), # model_name_b | |
model1, # model_a_state | |
model2 # model_b_state | |
) | |
def calculate_elo_change(rating_a, rating_b, winner): | |
"""Calculate ELO rating changes for both players.""" | |
expected_a = 1 / (1 + 10 ** ((rating_b - rating_a) / 400)) | |
expected_b = 1 - expected_a | |
if winner == "A": | |
score_a, score_b = 1, 0 | |
elif winner == "B": | |
score_a, score_b = 0, 1 | |
else: # Handle ties | |
score_a, score_b = 0.5, 0.5 | |
change_a = K_FACTOR * (score_a - expected_a) | |
change_b = K_FACTOR * (score_b - expected_b) | |
return change_a, change_b | |
def update_leaderboard(): | |
"""Calculate current ELO ratings from voting history.""" | |
ratings = defaultdict(lambda: DEFAULT_ELO) | |
matches = defaultdict(int) | |
wins = defaultdict(int) | |
# Load voting data | |
try: | |
with open('voting_data.json', 'r') as f: | |
voting_data = json.load(f) | |
except FileNotFoundError: | |
return pd.DataFrame() | |
# Process each vote | |
for vote in voting_data: | |
model_a = vote['model_a'] | |
model_b = vote['model_b'] | |
winner = vote['winner'] | |
# Skip if models aren't in current model_data | |
if model_a not in model_data or model_b not in model_data: | |
continue | |
# Update match counts | |
matches[model_a] += 1 | |
matches[model_b] += 1 | |
if winner == "A": | |
wins[model_a] += 1 | |
elif winner == "B": | |
wins[model_b] += 1 | |
else: # Handle ties | |
wins[model_a] += 0.5 | |
wins[model_b] += 0.5 | |
# Update ELO ratings | |
change_a, change_b = calculate_elo_change(ratings[model_a], ratings[model_b], winner) | |
ratings[model_a] += change_a | |
ratings[model_b] += change_b | |
# Create leaderboard DataFrame | |
leaderboard_data = [] | |
for model in model_data.keys(): # Only include current models | |
win_rate = (wins[model] / matches[model] * 100) if matches[model] > 0 else 0 | |
ci = 1.96 * (400 / (matches[model] + 1) ** 0.5) if matches[model] > 0 else 0 # Confidence interval | |
leaderboard_data.append({ | |
'Model': model, | |
'ELO': round(ratings[model], 1), | |
'95% CI': f"±{ci:.1f}", | |
'Matches': matches[model], | |
'Win Rate': f"{win_rate:.1f}%", | |
'Organization': model_data[model]['organization'], | |
'License': model_data[model]['license'] | |
}) | |
# Sort by ELO rating | |
df = pd.DataFrame(leaderboard_data) | |
return df.sort_values('ELO', ascending=False).reset_index(drop=True) | |
# Update the display_leaderboard function | |
def display_leaderboard(): | |
df = update_leaderboard() | |
return gr.DataFrame( | |
value=df, | |
headers=['Model', 'ELO', '95% CI', 'Matches', 'Organization', 'License'], | |
datatype=['str', 'number', 'str', 'number', 'str', 'str', 'str'], | |
row_count=(len(df) + 1, 'dynamic'), | |
) | |
# Update the leaderboard table definition in the UI | |
leaderboard_table = gr.Dataframe( | |
headers=['Model', 'ELO', '95% CI', 'Matches', 'Organization', 'License'], | |
datatype=['str', 'number', 'str', 'number', 'str', 'str', 'str'] | |
) | |
def get_leaderboard_stats(): | |
"""Get summary statistics for the leaderboard.""" | |
try: | |
with open('voting_data.json', 'r') as f: | |
voting_data = json.load(f) | |
total_votes = len(voting_data) | |
total_models = len(model_data) | |
last_updated = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") | |
return f""" | |
### Leaderboard Stats | |
- **Total Models**: {total_models} | |
- **Total Votes**: {total_votes} | |
- **Last Updated**: {last_updated} | |
""" | |
except FileNotFoundError: | |
return "No voting data available" | |
def initialize_voting_data(): | |
"""Initialize or clear the voting data file.""" | |
empty_data = [] | |
with open('voting_data.json', 'w') as f: | |
json.dump(empty_data, f) | |
# Add this near the start of your app initialization, before the Gradio interface setup | |
if __name__ == "__main__": | |
initialize_voting_data() | |
# ... rest of your Gradio app setup ... | |
with gr.Blocks(theme='default', css=CSS_STYLES) as demo: | |
judge_id = gr.State(get_new_session_id()) | |
gr.Markdown(MAIN_TITLE) | |
gr.Markdown(HOW_IT_WORKS) | |
with gr.Tabs(): | |
with gr.TabItem("Judge Arena"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown(BATTLE_RULES) | |
gr.Markdown(EVAL_DESCRIPTION) | |
# Eval Prompt and Variables side by side | |
with gr.Row(): | |
# Left column - Eval Prompt | |
with gr.Column(scale=1): | |
eval_prompt = gr.TextArea( | |
label="Evaluator Prompt", | |
lines=1, | |
value=DEFAULT_EVAL_PROMPT, | |
placeholder="Type your eval prompt here... denote variables in {{curly brackets}} to be populated on the right.", | |
show_label=True | |
) | |
# Right column - Variable Mapping | |
with gr.Column(scale=1): | |
gr.Markdown("### Sample to test the evaluator") | |
# Create inputs for up to 5 variables, with first two visible by default | |
variable_rows = [] | |
for i in range(5): | |
initial_visibility = True if i < 2 else False | |
with gr.Group(visible=initial_visibility) as var_row: | |
# Variable input with direct label | |
initial_value = DEFAULT_INPUT if i == 0 else DEFAULT_RESPONSE | |
initial_label = "input" if i == 0 else "response" if i == 1 else f"variable_{i+1}" | |
var_input = gr.Textbox( | |
label=initial_label, | |
value=initial_value, | |
container=True | |
) | |
variable_rows.append((var_row, var_input)) | |
# Send button | |
with gr.Row(elem_classes="send-button-row"): | |
send_btn = gr.Button( | |
value="Test the evaluators", | |
variant="primary", | |
size="lg", | |
scale=1 | |
) | |
# Add divider heading for model outputs | |
gr.Markdown(VOTING_HEADER) | |
# Model Responses side-by-side | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Model A") | |
score_a = gr.Textbox(label="Score", interactive=False) | |
critique_a = gr.TextArea(label="Critique", lines=8, interactive=False) | |
model_name_a = gr.Markdown("*Model: Unknown*") | |
with gr.Column(): | |
gr.Markdown("### Model B") | |
score_b = gr.Textbox(label="Score", interactive=False) | |
critique_b = gr.TextArea(label="Critique", lines=8, interactive=False) | |
model_name_b = gr.Markdown("*Model: Unknown*") | |
# Initially hide vote buttons and regenerate button | |
with gr.Row(visible=False) as action_buttons_row: | |
vote_a = gr.Button("Choose A", variant="primary") | |
vote_tie = gr.Button("Tie", variant="secondary") | |
vote_b = gr.Button("Choose B", variant="primary") | |
regenerate_button = gr.Button("Regenerate with different models", variant="secondary", visible=False) | |
# Add spacing and acknowledgements at the bottom | |
gr.Markdown(ACKNOWLEDGEMENTS) | |
with gr.TabItem("Leaderboard"): | |
refresh_button = gr.Button("Refresh") | |
stats_display = gr.Markdown() | |
leaderboard_table = gr.Dataframe( | |
headers=['Model', 'ELO', '95% CI', 'Matches', 'Organization', 'License'], | |
datatype=['str', 'number', 'str', 'number', 'str', 'str'] | |
) | |
with gr.TabItem("Policy"): | |
gr.Markdown(POLICY_CONTENT) | |
# Define state variables for model tracking | |
model_a_state = gr.State() | |
model_b_state = gr.State() | |
# Update variable inputs based on the eval prompt | |
def update_variables(eval_prompt): | |
variables = parse_variables(eval_prompt) | |
updates = [] | |
for i in range(5): | |
var_row, var_input = variable_rows[i] | |
if i < len(variables): | |
# Set default values for 'input' and 'response', otherwise leave empty | |
if variables[i] == "input": | |
initial_value = DEFAULT_INPUT | |
elif variables[i] == "response": | |
initial_value = DEFAULT_RESPONSE | |
else: | |
initial_value = "" # Empty for new variables | |
updates.extend([ | |
gr.update(visible=True), # var_row | |
gr.update(value=initial_value, label=variables[i], visible=True) # var_input with dynamic label | |
]) | |
else: | |
updates.extend([ | |
gr.update(visible=False), # var_row | |
gr.update(value="", visible=False) # var_input | |
]) | |
return updates | |
eval_prompt.change(fn=update_variables, inputs=eval_prompt, outputs=[item for sublist in variable_rows for item in sublist]) | |
# Regenerate button functionality | |
regenerate_button.click( | |
fn=regenerate_prompt, | |
inputs=[model_a_state, model_b_state, eval_prompt] + [var_input for _, var_input in variable_rows], | |
outputs=[ | |
score_a, | |
critique_a, | |
score_b, | |
critique_b, | |
action_buttons_row, | |
model_name_a, | |
model_name_b, | |
model_a_state, | |
model_b_state | |
] | |
) | |
# Update model names after responses are generated | |
def update_model_names(model_a, model_b): | |
return gr.update(value=f"*Model: {model_a}*"), gr.update(value=f"*Model: {model_b}*") | |
# Store the last submitted prompt and variables for comparison | |
last_submission = gr.State({}) | |
# Update the vote button click handlers | |
vote_a.click( | |
fn=lambda *args: vote('A', *args), | |
inputs=[model_a_state, model_b_state, eval_prompt, score_a, score_b, judge_id], | |
outputs=[action_buttons_row, model_name_a, model_name_b, send_btn, regenerate_button] | |
) | |
vote_b.click( | |
fn=lambda *args: vote('B', *args), | |
inputs=[model_a_state, model_b_state, eval_prompt, score_a, score_b, judge_id], | |
outputs=[action_buttons_row, model_name_a, model_name_b, send_btn, regenerate_button] | |
) | |
vote_tie.click( | |
fn=lambda *args: vote('Tie', *args), | |
inputs=[model_a_state, model_b_state, eval_prompt, score_a, score_b, judge_id], | |
outputs=[action_buttons_row, model_name_a, model_name_b, send_btn, regenerate_button] | |
) | |
# Update the send button handler to store the submitted inputs | |
def submit_and_store(prompt, *variables): | |
# Create a copy of the current submission | |
current_submission = {"prompt": prompt, "variables": variables} | |
# Get the responses | |
response_a, response_b, buttons_visible, regen_visible, model_a, model_b = submit_prompt(prompt, *variables) | |
# Parse the responses | |
score_a, critique_a = parse_model_response(response_a) | |
score_b, critique_b = parse_model_response(response_b) | |
# Update the last_submission state with the current values | |
last_submission.value = current_submission | |
return ( | |
score_a, | |
critique_a, | |
score_b, | |
critique_b, | |
buttons_visible, | |
gr.update(visible=True, interactive=True), # Show and enable regenerate button | |
model_a, | |
model_b, | |
gr.update(value="*Model: Unknown*"), | |
gr.update(value="*Model: Unknown*") | |
) | |
send_btn.click( | |
fn=submit_and_store, | |
inputs=[eval_prompt] + [var_input for _, var_input in variable_rows], | |
outputs=[ | |
score_a, | |
critique_a, | |
score_b, | |
critique_b, | |
action_buttons_row, | |
regenerate_button, | |
model_a_state, | |
model_b_state, | |
model_name_a, # Add model name outputs | |
model_name_b | |
] | |
) | |
# Update the input change handlers to also disable regenerate button | |
def handle_input_changes(prompt, *variables): | |
"""Enable send button and manage regenerate button based on input changes""" | |
last_inputs = last_submission.value | |
current_inputs = {"prompt": prompt, "variables": variables} | |
inputs_changed = last_inputs != current_inputs | |
return [ | |
gr.update(interactive=True), # send button always enabled | |
gr.update(interactive=not inputs_changed) # regenerate button disabled if inputs changed | |
] | |
# Update the change handlers for prompt and variables | |
eval_prompt.change( | |
fn=handle_input_changes, | |
inputs=[eval_prompt] + [var_input for _, var_input in variable_rows], | |
outputs=[send_btn, regenerate_button] | |
) | |
for _, var_input in variable_rows: | |
var_input.change( | |
fn=handle_input_changes, | |
inputs=[eval_prompt] + [var_input for _, var_input in variable_rows], | |
outputs=[send_btn, regenerate_button] | |
) | |
# Update the leaderboard | |
def refresh_leaderboard(): | |
leaderboard = get_leaderboard() | |
data = [ | |
[ | |
entry['Model'], | |
float(entry['ELO Score']), | |
entry['95% CI'], | |
entry['# Votes'], | |
entry['Organization'], | |
entry['License'] | |
] for entry in leaderboard | |
] | |
stats = get_leaderboard_stats() | |
return [gr.update(value=data), gr.update(value=stats)] | |
refresh_button.click( | |
fn=refresh_leaderboard, | |
inputs=None, | |
outputs=[leaderboard_table, stats_display] | |
) | |
# Add the load event at the very end, just before demo.launch() | |
demo.load( | |
fn=refresh_leaderboard, | |
inputs=None, | |
outputs=[leaderboard_table, stats_display] | |
) | |
demo.launch() |