hjkim00's picture
Restore all essential files - code, configs, and MBPP/HumanEval data
24c2665 verified
import argparse
import numpy as np
import itertools
from typing import Union, List
from tqdm import tqdm
from pebble import ProcessPool
from concurrent.futures import TimeoutError
# from grader import *
from parser import *
from utils import load_jsonl
from python_executor import PythonExecutor
from math_verify import verify, parse
def new_math_equal_process(params):
idx, pred, gt = params
try:
pred = parse('\\boxed{' + pred + '}')
gt = parse('\\boxed{' + gt + '}')
return verify(gt, pred)
except Exception as e:
print(f"Error in sample {idx}: {e}")
return False
def estimate_pass_at_k(
num_samples: Union[int, List[int], np.ndarray],
num_correct: Union[List[int], np.ndarray],
k: int
) -> np.ndarray:
"""
Estimates pass@k of each problem and returns them in an array.
"""
def estimator(n: int, c: int, k: int) -> float:
"""
Calculates 1 - comb(n - c, k) / comb(n, k).
"""
if n - c < k:
return 1.0
return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1))
if isinstance(num_samples, int):
num_samples_it = itertools.repeat(num_samples, len(num_correct))
else:
assert len(num_samples) == len(num_correct)
num_samples_it = iter(num_samples)
return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)])
def evaluate(data_name, prompt_type, samples: list=None, file_path: str=None, max_num_samples=None, execute=False):
assert samples or file_path, "samples or file_path must be provided"
if not samples:
samples = list(load_jsonl(file_path))
if 'idx' in samples[0]:
samples = {sample['idx']: sample for sample in samples}.values()
samples = sorted(samples, key=lambda x: x['idx'])
else:
samples = [dict(idx=idx, **sample) for idx, sample in enumerate(samples)]
if max_num_samples:
print(f"max_num_samples: {max_num_samples} / {len(samples)}")
samples = samples[:max_num_samples]
# parse gt
for sample in samples:
sample['gt_cot'], sample['gt'] = parse_ground_truth(sample, data_name)
params = [(idx, pred, sample['gt']) for idx, sample in enumerate(samples) for pred in sample['pred']]
scores = []
timeout_cnt = 0
with ProcessPool(max_workers=1) as pool:
future = pool.map(new_math_equal_process, params, timeout=3)
iterator = future.result()
with tqdm(total=len(samples), desc="Evaluate") as progress_bar:
while True:
try:
result = next(iterator)
scores.append(result)
except StopIteration:
break
except TimeoutError as error:
print(error)
scores.append(False)
timeout_cnt += 1
except Exception as error:
print(error.traceback)
exit()
progress_bar.update(1)
# for debug only
# import random
# scores = [random.random() > 0.9 for _ in range(len(params))]
idx = 0
score_mat = []
for sample in samples:
sample['score'] = scores[idx: idx+len(sample['pred'])]
assert len(sample['score']) == len(sample['pred'])
score_mat.append(sample['score'])
idx += len(sample['pred'])
max_len = max([len(s) for s in score_mat])
for i, s in enumerate(score_mat):
if len(s) < max_len:
score_mat[i] = s + [s[-1]] * (max_len - len(s)) # pad
# Convert score matrix to numpy array for easier manipulation
score_mat_np = np.array(score_mat)
# Calculate number of correct answers per problem
num_correct = np.sum(score_mat_np, axis=1)
# Calculate pass@k metrics for powers of 2 values
k_values = [1] # Start with 1
power = 1
while 2**power <= max_len: # Add powers of 2 up to max_len
k_values.append(2**power)
power += 1
pass_at_k = {}
for k in k_values:
pass_at_k_estimates = estimate_pass_at_k(max_len, num_correct, k)
pass_at_k[k] = float(np.round(np.mean(pass_at_k_estimates) * 100, decimals=1))
# Original metrics
# Convert each row to a single boolean indicating if any True exists in the row
row_eval = [any(row) for row in score_mat]
# Calculate the average
pass_acc = np.mean(row_eval)
# output mean of each column of scores
col_means = np.array(score_mat).mean()
mean_score = float(np.round(col_means * 100, decimals=1))
result_json = {
"num_samples": len(samples),
"num_scores": len(scores),
"timeout_samples": timeout_cnt,
"empty_samples": len([s for s in samples if not s['pred'][-1]]),
"acc": mean_score,
"pass_acc": np.round(pass_acc*100, decimals=1),
"pass@k": pass_at_k,
}
# each type score
if "type" in samples[0]:
type_scores = {}
for sample in samples:
if sample['type'] not in type_scores:
type_scores[sample['type']] = []
type_scores[sample['type']].append(sample['score'][-1])
type_scores = {k: np.round(np.array(v).mean() * 100, decimals=1) for k, v in type_scores.items()}
type_scores = {k: v for k, v in sorted(type_scores.items(), key=lambda item: item[0])}
result_json['type_acc'] = type_scores
print(result_json)
return samples, result_json
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--data_name", type=str, default="math")
parser.add_argument("--prompt_type", type=str, default="tool-integrated")
parser.add_argument("--file_path", type=str, default=None, required=True)
parser.add_argument("--max_num_samples", type=int, default=None)
parser.add_argument("--execute", action="store_true")
args = parser.parse_args()
return args
if __name__ == "__main__":
args = parse_args()
evaluate(data_name=args.data_name, prompt_type=args.prompt_type, file_path=args.file_path,
max_num_samples=args.max_num_samples, execute=args.execute)