|
import argparse |
|
import numpy as np |
|
import itertools |
|
from typing import Union, List |
|
from tqdm import tqdm |
|
from pebble import ProcessPool |
|
from concurrent.futures import TimeoutError |
|
|
|
|
|
|
|
from parser import * |
|
from utils import load_jsonl |
|
from python_executor import PythonExecutor |
|
from math_verify import verify, parse |
|
|
|
|
|
def new_math_equal_process(params): |
|
idx, pred, gt = params |
|
try: |
|
pred = parse('\\boxed{' + pred + '}') |
|
gt = parse('\\boxed{' + gt + '}') |
|
return verify(gt, pred) |
|
except Exception as e: |
|
print(f"Error in sample {idx}: {e}") |
|
return False |
|
|
|
def estimate_pass_at_k( |
|
num_samples: Union[int, List[int], np.ndarray], |
|
num_correct: Union[List[int], np.ndarray], |
|
k: int |
|
) -> np.ndarray: |
|
""" |
|
Estimates pass@k of each problem and returns them in an array. |
|
""" |
|
|
|
def estimator(n: int, c: int, k: int) -> float: |
|
""" |
|
Calculates 1 - comb(n - c, k) / comb(n, k). |
|
""" |
|
if n - c < k: |
|
return 1.0 |
|
return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1)) |
|
|
|
if isinstance(num_samples, int): |
|
num_samples_it = itertools.repeat(num_samples, len(num_correct)) |
|
else: |
|
assert len(num_samples) == len(num_correct) |
|
num_samples_it = iter(num_samples) |
|
|
|
return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)]) |
|
|
|
|
|
def evaluate(data_name, prompt_type, samples: list=None, file_path: str=None, max_num_samples=None, execute=False): |
|
assert samples or file_path, "samples or file_path must be provided" |
|
if not samples: |
|
samples = list(load_jsonl(file_path)) |
|
if 'idx' in samples[0]: |
|
samples = {sample['idx']: sample for sample in samples}.values() |
|
samples = sorted(samples, key=lambda x: x['idx']) |
|
else: |
|
samples = [dict(idx=idx, **sample) for idx, sample in enumerate(samples)] |
|
|
|
if max_num_samples: |
|
print(f"max_num_samples: {max_num_samples} / {len(samples)}") |
|
samples = samples[:max_num_samples] |
|
|
|
|
|
for sample in samples: |
|
sample['gt_cot'], sample['gt'] = parse_ground_truth(sample, data_name) |
|
params = [(idx, pred, sample['gt']) for idx, sample in enumerate(samples) for pred in sample['pred']] |
|
|
|
scores = [] |
|
timeout_cnt = 0 |
|
|
|
with ProcessPool(max_workers=1) as pool: |
|
future = pool.map(new_math_equal_process, params, timeout=3) |
|
iterator = future.result() |
|
with tqdm(total=len(samples), desc="Evaluate") as progress_bar: |
|
while True: |
|
try: |
|
result = next(iterator) |
|
scores.append(result) |
|
except StopIteration: |
|
break |
|
except TimeoutError as error: |
|
print(error) |
|
scores.append(False) |
|
timeout_cnt += 1 |
|
except Exception as error: |
|
print(error.traceback) |
|
exit() |
|
progress_bar.update(1) |
|
|
|
|
|
|
|
|
|
idx = 0 |
|
score_mat = [] |
|
for sample in samples: |
|
sample['score'] = scores[idx: idx+len(sample['pred'])] |
|
assert len(sample['score']) == len(sample['pred']) |
|
score_mat.append(sample['score']) |
|
idx += len(sample['pred']) |
|
|
|
max_len = max([len(s) for s in score_mat]) |
|
|
|
for i, s in enumerate(score_mat): |
|
if len(s) < max_len: |
|
score_mat[i] = s + [s[-1]] * (max_len - len(s)) |
|
|
|
|
|
score_mat_np = np.array(score_mat) |
|
|
|
|
|
num_correct = np.sum(score_mat_np, axis=1) |
|
|
|
|
|
k_values = [1] |
|
power = 1 |
|
while 2**power <= max_len: |
|
k_values.append(2**power) |
|
power += 1 |
|
|
|
pass_at_k = {} |
|
for k in k_values: |
|
pass_at_k_estimates = estimate_pass_at_k(max_len, num_correct, k) |
|
pass_at_k[k] = float(np.round(np.mean(pass_at_k_estimates) * 100, decimals=1)) |
|
|
|
|
|
|
|
row_eval = [any(row) for row in score_mat] |
|
|
|
|
|
pass_acc = np.mean(row_eval) |
|
|
|
col_means = np.array(score_mat).mean() |
|
mean_score = float(np.round(col_means * 100, decimals=1)) |
|
|
|
result_json = { |
|
"num_samples": len(samples), |
|
"num_scores": len(scores), |
|
"timeout_samples": timeout_cnt, |
|
"empty_samples": len([s for s in samples if not s['pred'][-1]]), |
|
"acc": mean_score, |
|
"pass_acc": np.round(pass_acc*100, decimals=1), |
|
"pass@k": pass_at_k, |
|
} |
|
|
|
|
|
if "type" in samples[0]: |
|
type_scores = {} |
|
for sample in samples: |
|
if sample['type'] not in type_scores: |
|
type_scores[sample['type']] = [] |
|
type_scores[sample['type']].append(sample['score'][-1]) |
|
type_scores = {k: np.round(np.array(v).mean() * 100, decimals=1) for k, v in type_scores.items()} |
|
type_scores = {k: v for k, v in sorted(type_scores.items(), key=lambda item: item[0])} |
|
result_json['type_acc'] = type_scores |
|
|
|
print(result_json) |
|
return samples, result_json |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--data_name", type=str, default="math") |
|
parser.add_argument("--prompt_type", type=str, default="tool-integrated") |
|
parser.add_argument("--file_path", type=str, default=None, required=True) |
|
parser.add_argument("--max_num_samples", type=int, default=None) |
|
parser.add_argument("--execute", action="store_true") |
|
args = parser.parse_args() |
|
return args |
|
|
|
if __name__ == "__main__": |
|
args = parse_args() |
|
evaluate(data_name=args.data_name, prompt_type=args.prompt_type, file_path=args.file_path, |
|
max_num_samples=args.max_num_samples, execute=args.execute) |
|
|