|
""" |
|
Common data structures and utilities. |
|
""" |
|
|
|
import ast |
|
import dataclasses |
|
import glob |
|
import json |
|
import os |
|
import re |
|
import time |
|
from typing import Optional |
|
|
|
|
|
API_MAX_RETRY = 16 |
|
API_RETRY_SLEEP = 10 |
|
API_ERROR_OUTPUT = "$ERROR$" |
|
|
|
TIE_DELTA = 0.1 |
|
|
|
|
|
NEED_REF_CATS = ["math", "reasoning", "coding"] |
|
|
|
|
|
two_score_pattern = re.compile("\[\[(\d+\.?\d*),\s?(\d+\.?\d*)\]\]") |
|
two_score_pattern_backup = re.compile("\[(\d+\.?\d*),\s?(\d+\.?\d*)\]") |
|
one_score_pattern = re.compile("\[\[(\d+\.?\d*)\]\]") |
|
one_score_pattern_backup = re.compile("\[(\d+\.?\d*)\]") |
|
|
|
|
|
temperature_config = { |
|
"writing": 0.7, |
|
"roleplay": 0.7, |
|
"extraction": 0.0, |
|
"math": 0.0, |
|
"coding": 0.0, |
|
"reasoning": 0.0, |
|
"stem": 0.1, |
|
"humanities": 0.1, |
|
} |
|
|
|
reverse_model_map = { |
|
"model_1": "model_2", |
|
"model_2": "model_1", |
|
} |
|
|
|
|
|
@dataclasses.dataclass |
|
class Judge: |
|
model_name: str |
|
prompt_template: dict |
|
ref_based: bool = False |
|
multi_turn: bool = False |
|
|
|
|
|
@dataclasses.dataclass |
|
class MatchSingle: |
|
question: dict |
|
model: str |
|
answer: dict |
|
judge: Judge |
|
ref_answer: dict = None |
|
multi_turn: bool = False |
|
|
|
|
|
@dataclasses.dataclass |
|
class MatchPair: |
|
question: dict |
|
model_1: str |
|
model_2: str |
|
answer_1: dict |
|
answer_2: dict |
|
judge: Judge |
|
ref_answer: dict = None |
|
multi_turn: bool = False |
|
|
|
|
|
def load_questions(question_file: str, begin: Optional[int], end: Optional[int]): |
|
"""Load questions from a file.""" |
|
questions = [] |
|
with open(question_file, "r") as ques_file: |
|
for line in ques_file: |
|
if line: |
|
questions.append(json.loads(line)) |
|
questions = questions[begin:end] |
|
return questions |
|
|
|
|
|
def load_model_answers(answer_dir: str): |
|
"""Load model answers. |
|
|
|
The return value is a python dict of type: |
|
Dict[model_name: str -> Dict[question_id: int -> answer: dict]] |
|
""" |
|
filenames = glob.glob(os.path.join(answer_dir, "*.jsonl")) |
|
filenames.sort() |
|
model_answers = {} |
|
|
|
for filename in filenames: |
|
model_name = os.path.basename(filename)[:-6] |
|
answer = {} |
|
with open(filename) as fin: |
|
for line in fin: |
|
line = json.loads(line) |
|
answer[line["question_id"]] = line |
|
model_answers[model_name] = answer |
|
|
|
return model_answers |
|
|
|
|
|
def load_judge_prompts(prompt_file: str): |
|
"""Load judge prompts. |
|
|
|
The return value is a python dict of type: |
|
Dict[judge_name: str -> dict] |
|
""" |
|
prompts = {} |
|
with open(prompt_file) as fin: |
|
for line in fin: |
|
line = json.loads(line) |
|
prompts[line["name"]] = line |
|
return prompts |
|
|
|
|
|
def run_judge_single(question, answer, judge, ref_answer, multi_turn=False): |
|
kwargs = {} |
|
model = judge.model_name |
|
if ref_answer is not None: |
|
kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] |
|
kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] |
|
|
|
if multi_turn: |
|
user_prompt = judge.prompt_template["prompt_template"].format( |
|
question_1=question["turns"][0], |
|
question_2=question["turns"][1], |
|
answer_1=answer["choices"][0]["turns"][0], |
|
answer_2=answer["choices"][0]["turns"][1], |
|
**kwargs, |
|
) |
|
else: |
|
user_prompt = judge.prompt_template["prompt_template"].format( |
|
question=question["turns"][0], |
|
answer=answer["choices"][0]["turns"][0], |
|
**kwargs, |
|
) |
|
|
|
rating = -1 |
|
|
|
system_prompt = judge.prompt_template["system_prompt"] |
|
conv = get_conversation_template(model) |
|
conv.system = system_prompt |
|
conv.append_message(conv.roles[0], user_prompt) |
|
conv.append_message(conv.roles[1], None) |
|
|
|
if model in ["gpt-3.5-turbo", "gpt-4"]: |
|
judgment = chat_compeletion_openai(model, conv, temperature=0, max_tokens=2048) |
|
elif model in ["claude-v1", "claude-instant-v1"]: |
|
judgment = chat_compeletion_anthropic( |
|
model, conv, temperature=0, max_tokens=1024 |
|
) |
|
else: |
|
raise ValueError(f"Invalid judge model name: {model}") |
|
|
|
if judge.prompt_template["output_format"] == "[[rating]]": |
|
match = re.search(one_score_pattern, judgment) |
|
if not match: |
|
match = re.search(one_score_pattern_backup, judgment) |
|
|
|
if match: |
|
rating = ast.literal_eval(match.groups()[0]) |
|
else: |
|
rating = -1 |
|
else: |
|
raise ValueError( |
|
f"invalid output format: {judge.prompt_template['output_format']}" |
|
) |
|
|
|
return rating, user_prompt, judgment |
|
|
|
|
|
def play_a_match_single(match: MatchPair, output_file: str): |
|
question, model, answer, judge, ref_answer, multi_turn = ( |
|
match.question, |
|
match.model, |
|
match.answer, |
|
match.judge, |
|
match.ref_answer, |
|
match.multi_turn, |
|
) |
|
|
|
if judge.prompt_template["type"] == "single": |
|
score, user_prompt, judgment = run_judge_single( |
|
question, answer, judge, ref_answer, multi_turn=multi_turn |
|
) |
|
|
|
question_id = question["question_id"] |
|
turn = 1 if not multi_turn else 2 |
|
result = { |
|
"question_id": question_id, |
|
"model": model, |
|
"judge": (judge.model_name, judge.prompt_template["name"]), |
|
"user_prompt": user_prompt, |
|
"judgment": judgment, |
|
"score": score, |
|
"turn": turn, |
|
"tstamp": time.time(), |
|
} |
|
print( |
|
f"question: {question_id}, turn: {turn}, model: {model}, " |
|
f"score: {score}, " |
|
f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
|
) |
|
else: |
|
raise ValueError(f"invalid judge type: {judge['type']}") |
|
|
|
if output_file: |
|
os.makedirs(os.path.dirname(output_file), exist_ok=True) |
|
with open(output_file, "a") as fout: |
|
fout.write(json.dumps(result) + "\n") |
|
|
|
return result |
|
|
|
|
|
def run_judge_pair(question, answer_a, answer_b, judge, ref_answer, multi_turn=False): |
|
kwargs = {} |
|
model = judge.model_name |
|
if ref_answer is not None: |
|
kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] |
|
kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] |
|
|
|
if multi_turn: |
|
system_prompt = judge.prompt_template["system_prompt"] |
|
user_prompt = judge.prompt_template["prompt_template"].format( |
|
question_1=question["turns"][0], |
|
question_2=question["turns"][1], |
|
answer_a_1=answer_a["choices"][0]["turns"][0], |
|
answer_b_1=answer_b["choices"][0]["turns"][0], |
|
answer_a_2=answer_a["choices"][0]["turns"][1], |
|
answer_b_2=answer_b["choices"][0]["turns"][1], |
|
**kwargs, |
|
) |
|
else: |
|
system_prompt = judge.prompt_template["system_prompt"] |
|
user_prompt = judge.prompt_template["prompt_template"].format( |
|
question=question["turns"][0], |
|
answer_a=answer_a["choices"][0]["turns"][0], |
|
answer_b=answer_b["choices"][0]["turns"][0], |
|
**kwargs, |
|
) |
|
|
|
winner = "error" |
|
|
|
conv = get_conversation_template(model) |
|
conv.append_message(conv.roles[0], user_prompt) |
|
conv.append_message(conv.roles[1], None) |
|
|
|
if model in ["gpt-3.5-turbo", "gpt-4"]: |
|
conv.system = system_prompt |
|
judgment = chat_compeletion_openai(model, conv, temperature=0, max_tokens=2048) |
|
elif model in ["claude-v1", "claude-instant-v1"]: |
|
if system_prompt != "You are a helpful assistant.": |
|
user_prompt = "[Instruction]\n" + system_prompt + "\n\n" + user_prompt |
|
conv.messages[0][1] = user_prompt |
|
judgment = chat_compeletion_anthropic( |
|
model, conv, temperature=0, max_tokens=1024 |
|
) |
|
else: |
|
raise ValueError(f"Invalid judge model name: {model}") |
|
|
|
if judge.prompt_template["output_format"] == "[[A]]": |
|
if "[[A]]" in judgment: |
|
winner = "A" |
|
elif "[[B]]" in judgment: |
|
winner = "B" |
|
elif "[[C]]" in judgment: |
|
winner = "tie" |
|
else: |
|
winner = "error" |
|
elif judge.prompt_template["output_format"] == "[[rating_a,rating_b]]": |
|
match = re.search(two_score_pattern, judgment) |
|
if not match: |
|
match = re.search(two_score_pattern_backup, judgment) |
|
if match: |
|
scores = [ast.literal_eval(s.strip()) for s in match.groups()] |
|
if abs(scores[0] - scores[1]) <= TIE_DELTA: |
|
winner = "tie" |
|
elif scores[0] > scores[1]: |
|
winner = "A" |
|
else: |
|
winner = "B" |
|
else: |
|
winner = "error" |
|
else: |
|
raise ValueError( |
|
f"invalid output format: {judge.prompt_template['output_format']}" |
|
) |
|
|
|
return winner, user_prompt, judgment |
|
|
|
|
|
def play_a_match_pair(match: MatchPair, output_file: str): |
|
question, model_1, model_2, answer_1, answer_2, judge, ref_answer, multi_turn = ( |
|
match.question, |
|
match.model_1, |
|
match.model_2, |
|
match.answer_1, |
|
match.answer_2, |
|
match.judge, |
|
match.ref_answer, |
|
match.multi_turn, |
|
) |
|
|
|
if judge.prompt_template["type"] == "pairwise": |
|
g1_winner, g1_user_prompt, g1_judgment = run_judge_pair( |
|
question, answer_1, answer_2, judge, ref_answer, multi_turn=multi_turn |
|
) |
|
g2_winner, g2_user_prompt, g2_judgment = run_judge_pair( |
|
question, answer_2, answer_1, judge, ref_answer, multi_turn=multi_turn |
|
) |
|
|
|
g1_map = {"A": "model_1", "B": "model_2"} |
|
g2_map = {"A": "model_2", "B": "model_1"} |
|
g1_winner = g1_map.get(g1_winner, g1_winner) |
|
g2_winner = g2_map.get(g2_winner, g2_winner) |
|
question_id = question["question_id"] |
|
turn = 1 if not multi_turn else 2 |
|
|
|
result = { |
|
"question_id": question_id, |
|
"model_1": model_1, |
|
"model_2": model_2, |
|
"g1_winner": g1_winner, |
|
"g2_winner": g2_winner, |
|
"judge": (judge.model_name, judge.prompt_template["name"]), |
|
"g1_user_prompt": g1_user_prompt, |
|
"g1_judgment": g1_judgment, |
|
"g2_user_prompt": g2_user_prompt, |
|
"g2_judgment": g2_judgment, |
|
"turn": turn, |
|
"tstamp": time.time(), |
|
} |
|
|
|
print( |
|
f"question: {question_id}, turn: {turn}, model_1: {model_1}, model_2: {model_2}, " |
|
f"g1_winner: {g1_winner}, g2_winner: {g2_winner}, " |
|
f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
|
) |
|
elif judge.prompt_template["type"] == "single": |
|
m1_score, m1_user_prompt, m1_judgment = run_judge_single( |
|
question, answer_1, judge |
|
) |
|
m2_score, m2_user_prompt, m2_judgment = run_judge_single( |
|
question, answer_2, judge |
|
) |
|
|
|
if abs(m1_score - m2_score) <= TIE_DELTA: |
|
winner = "tie" |
|
elif m1_score > m2_score: |
|
winner = "model_1" |
|
else: |
|
winner = "model_2" |
|
|
|
question_id = question["question_id"] |
|
result = { |
|
"question_id": question_id, |
|
"model_1": model_1, |
|
"model_2": model_2, |
|
"g1_winner": winner, |
|
"g2_winner": winner, |
|
"judge": (judge.model_name, judge.prompt_template["name"]), |
|
"g1_user_prompt": m1_user_prompt, |
|
"g1_judgment": m1_judgment, |
|
"g2_user_prompt": m2_user_prompt, |
|
"g2_judgment": m2_judgment, |
|
"m1_score": m1_score, |
|
"m2_score": m2_score, |
|
"tstamp": time.time(), |
|
} |
|
print( |
|
f"question: {question_id}, model_1: {model_1}, model_2: {model_2}, " |
|
f"winner: {winner}, m1_score: {m1_score}, m2_score: {m2_score}, " |
|
f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
|
) |
|
else: |
|
raise ValueError(f"invalid judge type: {judge['type']}") |
|
|
|
if output_file: |
|
os.makedirs(os.path.dirname(output_file), exist_ok=True) |
|
with open(output_file, "a") as fout: |
|
fout.write(json.dumps(result) + "\n") |
|
|
|
return result |
|
|
|
|
|
def chat_compeletion_openai(model, conv, temperature, max_tokens): |
|
output = API_ERROR_OUTPUT |
|
for _ in range(API_MAX_RETRY): |
|
try: |
|
messages = conv.to_openai_api_messages() |
|
response = openai.ChatCompletion.create( |
|
model=model, |
|
messages=messages, |
|
n=1, |
|
temperature=temperature, |
|
max_tokens=max_tokens, |
|
) |
|
output = response["choices"][0]["message"]["content"] |
|
break |
|
except openai.error.OpenAIError as e: |
|
print(type(e), e) |
|
time.sleep(API_RETRY_SLEEP) |
|
|
|
return output |
|
|
|
|
|
def chat_compeletion_anthropic(model, conv, temperature, max_tokens): |
|
output = API_ERROR_OUTPUT |
|
for _ in range(API_MAX_RETRY): |
|
try: |
|
c = anthropic.Client(os.environ["ANTHROPIC_API_KEY"]) |
|
prompt = conv.get_prompt() |
|
response = c.completion( |
|
model=model, |
|
prompt=prompt, |
|
stop_sequences=[anthropic.HUMAN_PROMPT], |
|
max_tokens_to_sample=max_tokens, |
|
temperature=temperature, |
|
) |
|
output = response["completion"] |
|
break |
|
except anthropic.ApiException as e: |
|
print(type(e), e) |
|
time.sleep(API_RETRY_SLEEP) |
|
return output.strip() |
|
|
|
|
|
def chat_compeletion_palm(chat_state, model, conv, temperature, max_tokens): |
|
from fastchat.serve.api_provider import init_palm_chat |
|
|
|
assert model == "palm-2-chat-bison-001" |
|
|
|
if chat_state is None: |
|
chat_state = init_palm_chat("chat-bison@001") |
|
|
|
parameters = { |
|
"temperature": temperature, |
|
"top_p": 0.8, |
|
"top_k": 40, |
|
"max_output_tokens": max_tokens, |
|
} |
|
output = API_ERROR_OUTPUT |
|
for _ in range(API_MAX_RETRY): |
|
try: |
|
response = chat_state.send_message(conv.messages[-2][1], **parameters) |
|
output = response.text |
|
break |
|
except Exception as e: |
|
print(type(e), e) |
|
time.sleep(API_RETRY_SLEEP) |
|
return chat_state, output |
|
|
|
|
|
def normalize_game_key_single(gamekey, result): |
|
"""Make the model names sorted in a game key.""" |
|
qid, model_1, model_2 = gamekey |
|
if model_1 < model_2: |
|
return gamekey, result |
|
else: |
|
new_gamekey = (qid, model_2, model_1) |
|
new_result = { |
|
"winners": tuple(reverse_model_map.get(x, x) for x in result["winners"]), |
|
"g1_judgment": result["g2_judgment"], |
|
"g2_judgment": result["g1_judgment"], |
|
} |
|
return new_gamekey, new_result |
|
|
|
|
|
def normalize_game_key_dict(judgment_dict): |
|
"""Make the model names sorted in the game keys.""" |
|
ret = {} |
|
for key, value in judgment_dict.items(): |
|
new_key, new_value = normalize_game_key_single(key, value) |
|
ret[new_key] = new_value |
|
return ret |
|
|
|
|
|
def load_pairwise_model_judgments(filename: str): |
|
"""Load model judgments. |
|
|
|
The return value is a dict of type: |
|
Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] |
|
""" |
|
judge_dict = {} |
|
|
|
for line in open(filename): |
|
obj = json.loads(line) |
|
judge = tuple(obj["judge"]) |
|
qid, model_1, model_2 = obj["question_id"], obj["model_1"], obj["model_2"] |
|
|
|
if judge not in judge_dict: |
|
judge_dict[judge] = {} |
|
|
|
if "winner" in obj: |
|
winner = obj["winner"] |
|
elif "g1_winner" in obj and "g2_winner" in obj: |
|
g1_winner, g2_winner = obj["g1_winner"], obj["g2_winner"] |
|
if g1_winner == g2_winner: |
|
winner = g1_winner |
|
else: |
|
winner = "inconsistent" |
|
else: |
|
raise ValueError(f"Invalid keys: {list(obj.keys())}") |
|
|
|
gamekey = (qid, model_1, model_2) |
|
winners = (winner,) |
|
|
|
judge_dict[judge][gamekey] = { |
|
"winners": winners, |
|
"g1_judgment": obj["g1_judgment"], |
|
"g2_judgment": obj["g2_judgment"], |
|
} |
|
|
|
|
|
normalized = {} |
|
for judge, value in judge_dict.items(): |
|
normalized[judge] = normalize_game_key_dict(value) |
|
return normalized |
|
|
|
|
|
def load_single_model_judgments(filename: str): |
|
"""Load model judgments. |
|
|
|
The return value is a dict of type: |
|
Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] |
|
""" |
|
judge_dict = {} |
|
|
|
for line in open(filename): |
|
obj = json.loads(line) |
|
judge = tuple(obj["judge"]) |
|
qid, model = obj["question_id"], obj["model"] |
|
|
|
if judge not in judge_dict: |
|
judge_dict[judge] = {} |
|
|
|
gamekey = (qid, model) |
|
|
|
judge_dict[judge][gamekey] = { |
|
"score": obj["score"], |
|
"judgment": obj["judgment"], |
|
} |
|
return judge_dict |
|
|
|
|
|
def resolve_pairwise_judgment_dict( |
|
question, model_judgments_normal, model_judgments_math, multi_turn=False |
|
): |
|
"""Return the correct pairwise judge.""" |
|
if multi_turn: |
|
if question["category"] in NEED_REF_CATS: |
|
return model_judgments_math[("gpt-4", "pair-math-v1-multi-turn")] |
|
return model_judgments_normal[("gpt-4", "pair-v2-multi-turn")] |
|
|
|
if question["category"] in NEED_REF_CATS: |
|
return model_judgments_math[("gpt-4", "pair-math-v1")] |
|
else: |
|
return model_judgments_normal[("gpt-4", "pair-v2")] |
|
|
|
|
|
def resolve_single_judgment_dict( |
|
question, model_judgments_normal, model_judgments_math, multi_turn=False |
|
): |
|
"""Return the correct single answer grading judge.""" |
|
if multi_turn: |
|
if question["category"] in NEED_REF_CATS: |
|
return model_judgments_math[("gpt-4", "single-math-v1-multi-turn")] |
|
return model_judgments_normal[("gpt-4", "single-v1-multi-turn")] |
|
|
|
if question["category"] in NEED_REF_CATS: |
|
return model_judgments_math[("gpt-4", "single-math-v1")] |
|
else: |
|
return model_judgments_normal[("gpt-4", "single-v1")] |
|
|
|
|
|
def get_pairwise_judge_explanation(gamekey, judgment_dict): |
|
"""Get model judge explanation.""" |
|
try: |
|
qid, model_1, model_2 = gamekey |
|
if model_1 < model_2: |
|
res = judgment_dict[gamekey] |
|
g1_judgment, g2_judgment = res["g1_judgment"], res["g2_judgment"] |
|
else: |
|
new_gamekey = (qid, model_2, model_1) |
|
res = judgment_dict[new_gamekey] |
|
|
|
model_1, model_2 = model_1, model_2 |
|
g1_judgment, g2_judgment = res["g2_judgment"], res["g1_judgment"] |
|
|
|
return ( |
|
f"**Game 1**. **A**: {model_1}, **B**: {model_2}\n\n" |
|
f"**Judgment**: {g1_judgment}" |
|
+ f"\n\n`--------------------------`\n\n" |
|
+ f"**Game 2**. **A**: {model_2}, **B**: {model_1}\n\n" |
|
f"**Judgment**: {g2_judgment}" |
|
) |
|
except KeyError: |
|
return "N/A" |
|
|
|
|
|
def get_single_judge_explanation(gamekey, judgment_dict): |
|
"""Get model judge explanation.""" |
|
try: |
|
qid, model = gamekey |
|
|
|
res = judgment_dict[gamekey] |
|
|
|
g1_judgment = res["judgment"] |
|
g1_score = res["score"] |
|
|
|
return ( |
|
f"**Game 1**. **A**: {model}, **Score**: {g1_score}\n\n" |
|
f"**Judgment**: {g1_judgment}" |
|
) |
|
except KeyError: |
|
return "N/A" |
|
|
|
|
|
def check_data(questions, model_answers, ref_answers, models, judges): |
|
|
|
for m in models: |
|
assert m in model_answers, f"Missing model answer for {m}" |
|
m_answer = model_answers[m] |
|
for q in questions: |
|
assert ( |
|
q["question_id"] in m_answer |
|
), f"Missing model {m}'s answer to Question {q['question_id']}" |
|
|
|
for jg in judges.values(): |
|
if not jg.ref_based: |
|
continue |
|
for q in questions: |
|
if q["category"] not in NEED_REF_CATS: |
|
continue |
|
assert ( |
|
q["question_id"] in ref_answers[jg.model_name] |
|
), f"Missing reference answer to Question {q['question_id']} for judge {jg.model_name}" |
|
|
|
|
|
def get_model_list(answer_dir): |
|
file_paths = glob.glob(f"{answer_dir}/*.jsonl") |
|
file_names = [os.path.splitext(os.path.basename(f))[0] for f in file_paths] |
|
return file_names |
|
|