import dataclasses import itertools import os import re import tempfile from collections import defaultdict from pathlib import Path import datasets import evaluate import numpy as np from tqdm import tqdm from .execution import execute_predictions STDOUT_PARSE_REGEX = re.compile(r"^TEST-(.+)\.\.\.(.+)$", flags=re.MULTILINE) _CITATION = """\ @article{orlanski2023measuring, title={Measuring The Impact Of Programming Language Distribution}, author={Orlanski, Gabriel and Xiao, Kefan and Garcia, Xavier and Hui, Jeffrey and Howland, Joshua and Malmaud, Jonathan and Austin, Jacob and Singh, Rishah and Catasta, Michele}, journal={arXiv preprint arXiv:2302.01973}, year={2023} } """ _DESCRIPTION = """\ This metric implements the evaluation harness for datasets translated with the BabelCode framework as described in the paper "Measuring The Impact Of Programming Language Distribution" (https://arxiv.org/abs/2302.01973). """ _KWARGS_DESCRIPTION = """ Calculates how many predictions per question pass a set of tests for the given problem. Args: predictions: The list of predictions for each question to execute. languages: The language to use for each question. question_dicts: The information for each question. k: number of code candidates to consider in the evaluation (Default: [1, 10, 100]) num_workers: number of workers used to evaluate the candidate programs (Default: 4). language_timeout: Timeouts to use for each language. If it is not set, will default to the one in the question dict (Default: None). Returns: pass_at_k: dict with pass rates for each k results: dict with granular results of each unittest Examples: >>> bc_eval = evaluate.load("bc_eval") >>> predictions = [["def add(a,b):\n\treturn a+b", "def add(a,b):\n\treturn a-b"]] >>> languages = ["Python"] >>> question_dicts = [{"test_code": "...", "entry_fn_name": "add","entry_cls_name":"Solution", "test_case_ids":["0","1"],"test_list":"..."}] >>> pass_at_k, results = code_eval.compute(predictions=predictions,languages=languages, question_dicts=question_dicts, k=[1, 2]) >>> print(pass_at_k) {'pass@1': 0.5, 'pass@2': 1.0} """ _WARNING = """ ################################################################################ !!!WARNING!!! ################################################################################ The "bc_eval" metric executes untrusted model-generated code in Python. Although it is highly unlikely that model-generated code will do something overtly malicious in response to this test suite, model-generated code may act destructively due to a lack of model capability or alignment. Users are strongly encouraged to sandbox this evaluation suite so that it does not perform destructive actions on their host or network. For more information on how OpenAI sandboxes its code, see the paper "Evaluating Large Language Models Trained on Code" (https://arxiv.org/abs/2107.03374). Once you have read this disclaimer and taken appropriate precautions, set the environment variable HF_ALLOW_CODE_EVAL="1". Within Python you can to this with: >>> import os >>> os.environ["HF_ALLOW_CODE_EVAL"] = "1" ################################################################################\ """ _QUESTION_INFO_KEYS = { "entry_fn_name", "entry_cls_name", "test_code", "test_list", "test_case_ids", } def make_file_and_command(qid, idx, pred, question, working_dir, timeout_override=None): file_name = f"pred.{question['extension']}" pred_dir = working_dir.joinpath(idx) pred_dir.mkdir(parents=True) pred_file = pred_dir.joinpath(file_name) with pred_file.open("w") as f: code = question["test_code"] code = question["test_code"].replace("PLACEHOLDER_CODE_BODY", pred) code = code.replace("PLACEHOLDER_FN_NAME", question["entry_fn_name"]) code = code.replace("PLACEHOLDER_CLS_NAME", question["entry_cls_name"]) f.write(code) commands = [] for cmd, t in zip(question["commands"], question["timeouts"]): commands.append( { "timeout": t if timeout_override is None else timeout_override, "command": [c if c != "__FILENAME__" else file_name for c in cmd], } ) return {"qid": qid, "idx": idx, "commands": commands, "cwd": pred_dir} def _write_preds( preds, languages, language_timeout, question_dicts, tmp_dir, ): commands = [] question_id_to_dict = {} for pred_list, l, q_dict in tqdm( zip(preds, languages, question_dicts), desc="Setup", total=len(preds) ): qid = len(question_id_to_dict) q_dict["language"] = l question_id_to_dict[qid] = q_dict for p in pred_list: commands.append( make_file_and_command( qid=qid, idx=str(len(commands)), pred=p, question=q_dict, timeout_override=language_timeout.get(l), working_dir=tmp_dir, ) ) return question_id_to_dict, commands @evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) class BabelCodeEval(evaluate.Metric): def _info(self): list_keys = ["timeouts", "commands", "test_case_ids"] question_info_type = { k: datasets.Value(dtype="string") for k in _QUESTION_INFO_KEYS if k not in list_keys } question_info_type["test_case_ids"] = datasets.Value("string") question_info_type["commands"] = datasets.Sequence(datasets.Value("string")) question_info_type["timeouts"] = datasets.Sequence(datasets.Value("int32")) return evaluate.MetricInfo( # This is the description that will appear on the metrics page. description=_DESCRIPTION, citation=_CITATION, inputs_description=_KWARGS_DESCRIPTION, # This defines the format of each prediction and reference features=datasets.Features( { "predictions": datasets.Sequence(datasets.Value("string")), "languages": datasets.Value("string"), "question_dicts": question_info_type, } ), homepage="https://github.com/google-research/babelcode", codebase_urls=["https://github.com/google-research/babelcode"], reference_urls=["https://github.com/google-research/babelcode"], ) def _compute( self, predictions, languages, question_dicts, k=[1, 10, 100], num_workers=4, language_timeout=None, ): """Returns the scores""" if os.getenv("HF_ALLOW_CODE_EVAL", 0) != "1": raise ValueError(_WARNING) language_timeout = language_timeout or {} with tempfile.TemporaryDirectory() as tmp_dir: working_dir = Path(tmp_dir) question_map, pred_commands = _write_preds( preds=predictions, languages=languages, language_timeout=language_timeout, question_dicts=question_dicts, tmp_dir=working_dir, ) results = execute_predictions( pred_commands, num_workers=num_workers, max_task_per_child=5, garbage_collection_freq=500, ) all_results, q_passes, q_pct = _eval_predictions(results, question_map) assert len(q_passes) == len(q_pct) metrics = {} for lang in q_passes: metrics.update( _calculate_metrics(lang, q_passes[lang], q_pct[lang], k_vals=k) ) return metrics, all_results def _eval_single_pred(result, test_ids, num_expected_commands): test_case_results = {k: "MISSING" for k in test_ids} if len(result["results"]) != num_expected_commands: return "HAD_ERROR", 0, test_case_results last_result = result["results"][-1] if last_result.timed_out: return "TIMED_OUT", 0, test_case_results elif last_result.return_code != 0: return "HAD_ERROR", 0, test_case_results elif not last_result.stdout: return "HAD_ERROR", 0, test_case_results for match in STDOUT_PARSE_REGEX.findall(last_result.stdout): idx, test_result = match if idx in test_ids: if test_case_results[idx] != "MISSING": return "UNKNOWN_ERROR", 0, test_case_results test_case_results[idx] = test_result.strip() did_test_fail = False had_error = False num_passed = 0 for r in test_case_results.values(): if r == "PASSED": num_passed += 1 elif r == "FAILED": did_test_fail = True else: had_error = True if had_error: return "HAD_ERROR", num_passed, test_case_results if did_test_fail: return "FAILED", num_passed, test_case_results return "PASSED", num_passed, test_case_results def _eval_predictions(pred_results, question_map): out = [] question_results = defaultdict(lambda: defaultdict(list)) question_pct_pass = defaultdict(lambda: defaultdict(list)) for p in pred_results: question = question_map[p["qid"]] test_cases = question["test_case_ids"] num_expected_commands = len(question["commands"]) outcome, num_passed, test_case_results = _eval_single_pred( p, test_ids=test_cases, num_expected_commands=num_expected_commands ) p["results"] = [dataclasses.asdict(r) for r in p["results"]] p["test_cases"] = test_case_results p["outcome"] = outcome lang = question["language"] question_results[lang][p["qid"]].append(num_passed == len(test_case_results)) question_pct_pass[lang][p["qid"]].append(num_passed / len(test_case_results)) out.append(p) return out, question_results, question_pct_pass def _calculate_metrics(lang, q_passed, q_pcts, k_vals): assert len(q_passed) == len(q_pcts) num_samples = np.zeros(len(q_passed)) num_correct = np.zeros(len(q_passed)) pcts_passed = np.zeros(len(q_passed)) for i, (k, v) in enumerate(q_passed.items()): num_samples[i] = len(v) num_correct[i] = sum(v) pcts_passed[i] = np.mean(q_pcts[k]) out = { f"{lang}/pass@{k}": estimate_pass_at_k(num_samples, num_correct, k).mean() for k in k_vals } out[f"{lang}/mean_pct_pass"] = np.mean(pcts_passed) return out def estimate_pass_at_k(num_samples, num_correct, k): """Estimates pass@k of each problem and returns them in an array.""" def estimator(n: int, c: int, k: int) -> float: """Calculates 1 - comb(n - c, k) / comb(n, k).""" if n - c < k: return 1.0 return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1)) if isinstance(num_samples, int): num_samples_it = itertools.repeat(num_samples, len(num_correct)) else: assert len(num_samples) == len(num_correct) num_samples_it = iter(num_samples) return np.array( [estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)] )