File size: 6,217 Bytes
24c2665 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import argparse
import numpy as np
import itertools
from typing import Union, List
from tqdm import tqdm
from pebble import ProcessPool
from concurrent.futures import TimeoutError
# from grader import *
from parser import *
from utils import load_jsonl
from python_executor import PythonExecutor
from math_verify import verify, parse
def new_math_equal_process(params):
idx, pred, gt = params
try:
pred = parse('\\boxed{' + pred + '}')
gt = parse('\\boxed{' + gt + '}')
return verify(gt, pred)
except Exception as e:
print(f"Error in sample {idx}: {e}")
return False
def estimate_pass_at_k(
num_samples: Union[int, List[int], np.ndarray],
num_correct: Union[List[int], np.ndarray],
k: int
) -> np.ndarray:
"""
Estimates pass@k of each problem and returns them in an array.
"""
def estimator(n: int, c: int, k: int) -> float:
"""
Calculates 1 - comb(n - c, k) / comb(n, k).
"""
if n - c < k:
return 1.0
return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))
if isinstance(num_samples, int):
num_samples_it = itertools.repeat(num_samples, len(num_correct))
else:
assert len(num_samples) == len(num_correct)
num_samples_it = iter(num_samples)
return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)])
def evaluate(data_name, prompt_type, samples: list=None, file_path: str=None, max_num_samples=None, execute=False):
assert samples or file_path, "samples or file_path must be provided"
if not samples:
samples = list(load_jsonl(file_path))
if 'idx' in samples[0]:
samples = {sample['idx']: sample for sample in samples}.values()
samples = sorted(samples, key=lambda x: x['idx'])
else:
samples = [dict(idx=idx, **sample) for idx, sample in enumerate(samples)]
if max_num_samples:
print(f"max_num_samples: {max_num_samples} / {len(samples)}")
samples = samples[:max_num_samples]
# parse gt
for sample in samples:
sample['gt_cot'], sample['gt'] = parse_ground_truth(sample, data_name)
params = [(idx, pred, sample['gt']) for idx, sample in enumerate(samples) for pred in sample['pred']]
scores = []
timeout_cnt = 0
with ProcessPool(max_workers=1) as pool:
future = pool.map(new_math_equal_process, params, timeout=3)
iterator = future.result()
with tqdm(total=len(samples), desc="Evaluate") as progress_bar:
while True:
try:
result = next(iterator)
scores.append(result)
except StopIteration:
break
except TimeoutError as error:
print(error)
scores.append(False)
timeout_cnt += 1
except Exception as error:
print(error.traceback)
exit()
progress_bar.update(1)
# for debug only
# import random
# scores = [random.random() > 0.9 for _ in range(len(params))]
idx = 0
score_mat = []
for sample in samples:
sample['score'] = scores[idx: idx+len(sample['pred'])]
assert len(sample['score']) == len(sample['pred'])
score_mat.append(sample['score'])
idx += len(sample['pred'])
max_len = max([len(s) for s in score_mat])
for i, s in enumerate(score_mat):
if len(s) < max_len:
score_mat[i] = s + [s[-1]] * (max_len - len(s)) # pad
# Convert score matrix to numpy array for easier manipulation
score_mat_np = np.array(score_mat)
# Calculate number of correct answers per problem
num_correct = np.sum(score_mat_np, axis=1)
# Calculate pass@k metrics for powers of 2 values
k_values = [1] # Start with 1
power = 1
while 2**power <= max_len: # Add powers of 2 up to max_len
k_values.append(2**power)
power += 1
pass_at_k = {}
for k in k_values:
pass_at_k_estimates = estimate_pass_at_k(max_len, num_correct, k)
pass_at_k[k] = float(np.round(np.mean(pass_at_k_estimates) * 100, decimals=1))
# Original metrics
# Convert each row to a single boolean indicating if any True exists in the row
row_eval = [any(row) for row in score_mat]
# Calculate the average
pass_acc = np.mean(row_eval)
# output mean of each column of scores
col_means = np.array(score_mat).mean()
mean_score = float(np.round(col_means * 100, decimals=1))
result_json = {
"num_samples": len(samples),
"num_scores": len(scores),
"timeout_samples": timeout_cnt,
"empty_samples": len([s for s in samples if not s['pred'][-1]]),
"acc": mean_score,
"pass_acc": np.round(pass_acc*100, decimals=1),
"pass@k": pass_at_k,
}
# each type score
if "type" in samples[0]:
type_scores = {}
for sample in samples:
if sample['type'] not in type_scores:
type_scores[sample['type']] = []
type_scores[sample['type']].append(sample['score'][-1])
type_scores = {k: np.round(np.array(v).mean() * 100, decimals=1) for k, v in type_scores.items()}
type_scores = {k: v for k, v in sorted(type_scores.items(), key=lambda item: item[0])}
result_json['type_acc'] = type_scores
print(result_json)
return samples, result_json
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--data_name", type=str, default="math")
parser.add_argument("--prompt_type", type=str, default="tool-integrated")
parser.add_argument("--file_path", type=str, default=None, required=True)
parser.add_argument("--max_num_samples", type=int, default=None)
parser.add_argument("--execute", action="store_true")
args = parser.parse_args()
return args
if __name__ == "__main__":
args = parse_args()
evaluate(data_name=args.data_name, prompt_type=args.prompt_type, file_path=args.file_path,
max_num_samples=args.max_num_samples, execute=args.execute)
|