Spaces:
Running
on
T4
Running
on
T4
import argparse | |
import json | |
import os | |
import openai | |
import tqdm | |
import ray | |
import time | |
NUM_SECONDS_TO_SLEEP = 3 | |
def get_eval(content: str, max_tokens: int): | |
while True: | |
try: | |
response = openai.ChatCompletion.create( | |
model='gpt-4', | |
messages=[{ | |
'role': 'system', | |
'content': 'You are a helpful and precise assistant for checking the quality of the answer.' | |
}, { | |
'role': 'user', | |
'content': content, | |
}], | |
temperature=0.2, # TODO: figure out which temperature is best for evaluation | |
max_tokens=max_tokens, | |
) | |
break | |
except openai.error.RateLimitError: | |
pass | |
except Exception as e: | |
print(e) | |
time.sleep(NUM_SECONDS_TO_SLEEP) | |
print('success!') | |
return response['choices'][0]['message']['content'] | |
def parse_score(review): | |
try: | |
score_pair = review.split('\n')[0] | |
score_pair = score_pair.replace(',', ' ') | |
sp = score_pair.split(' ') | |
if len(sp) == 2: | |
return [float(sp[0]), float(sp[1])] | |
else: | |
print('error', review) | |
return [-1, -1] | |
except Exception as e: | |
print(e) | |
print('error', review) | |
return [-1, -1] | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='ChatGPT-based QA evaluation.') | |
parser.add_argument('-q', '--question') | |
# parser.add_argument('-a', '--answer') | |
parser.add_argument('-a', '--answer-list', nargs='+', default=[]) | |
parser.add_argument('-r', '--rule') | |
parser.add_argument('-o', '--output') | |
parser.add_argument('--max-tokens', type=int, default=1024, help='maximum number of tokens produced in the output') | |
args = parser.parse_args() | |
ray.init() | |
f_q = open(os.path.expanduser(args.question)) | |
f_ans1 = open(os.path.expanduser(args.answer_list[0])) | |
f_ans2 = open(os.path.expanduser(args.answer_list[1])) | |
rule_dict = json.load(open(os.path.expanduser(args.rule), 'r')) | |
review_file = open(f'{args.output}', 'w') | |
js_list = [] | |
handles = [] | |
idx = 0 | |
for ques_js, ans1_js, ans2_js in zip(f_q, f_ans1, f_ans2): | |
# if idx == 1: | |
# break | |
ques = json.loads(ques_js) | |
ans1 = json.loads(ans1_js) | |
ans2 = json.loads(ans2_js) | |
category = json.loads(ques_js)['category'] | |
if category in rule_dict: | |
rule = rule_dict[category] | |
else: | |
rule = rule_dict['default'] | |
prompt = rule['prompt'] | |
role = rule['role'] | |
content = (f'[Question]\n{ques["text"]}\n\n' | |
f'[{role} 1]\n{ans1["text"]}\n\n[End of {role} 1]\n\n' | |
f'[{role} 2]\n{ans2["text"]}\n\n[End of {role} 2]\n\n' | |
f'[System]\n{prompt}\n\n') | |
js_list.append({ | |
'id': idx+1, | |
'question_id': ques['question_id'], | |
'answer1_id': ans1['answer_id'], | |
'answer2_id': ans2['answer_id'], | |
'category': category}) | |
idx += 1 | |
handles.append(get_eval.remote(content, args.max_tokens)) | |
# To avoid the rate limit set by OpenAI | |
time.sleep(NUM_SECONDS_TO_SLEEP) | |
reviews = ray.get(handles) | |
for idx, review in enumerate(reviews): | |
scores = parse_score(review) | |
js_list[idx]['content'] = review | |
js_list[idx]['tuple'] = scores | |
review_file.write(json.dumps(js_list[idx]) + '\n') | |
review_file.close() | |