import json import os from execution import check_correctness import copy import argparse from tqdm import tqdm import subprocess from concurrent.futures import ThreadPoolExecutor import concurrent.futures import os import re import shutil import contextlib from concurrent.futures import ThreadPoolExecutor import concurrent.futures from tqdm import tqdm import contextlib import io import os import signal from tqdm import tqdm class TimeoutException(Exception): pass class WriteOnlyStringIO(io.StringIO): """ StringIO that throws an exception when it's read from """ def read(self, *args, **kwargs): raise IOError def readline(self, *args, **kwargs): raise IOError def readlines(self, *args, **kwargs): raise IOError def readable(self, *args, **kwargs): """ Returns True if the IO object can be read. """ return False class redirect_stdin(contextlib._RedirectStream): # type: ignore _stream = 'stdin' @contextlib.contextmanager def swallow_io(): stream = WriteOnlyStringIO() with contextlib.redirect_stdout(stream): with contextlib.redirect_stderr(stream): with redirect_stdin(stream): yield @contextlib.contextmanager def time_limit(seconds: float): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.setitimer(signal.ITIMER_REAL, seconds) signal.signal(signal.SIGALRM, signal_handler) try: yield finally: signal.setitimer(signal.ITIMER_REAL, 0) ListNode_text = """ class ListNode: def __init__(self, val=0, next=None): self.val = val self.next = next """ TreeNode_text = """ class TreeNode: def __init__(self, val=0, left=None, right=None, next=None): self.val = val self.left = left self.right = right self.next = next """ import_pkg = """ from typing import * from bisect import * from collections import * from copy import * from datetime import * from heapq import * from math import * from re import * from string import * from random import * from itertools import * from functools import * from operator import * import string import re import datetime import collections import heapq import bisect import copy import math import random import itertools import functools import operator """ memory_profiler_prompt = r""" def parse_profile_table(profile_table: str): table = {"filename": None, "rows": []} for line in profile_table.strip().split("\n"): if line.startswith("Filename:"): table["filename"] = line.split(": ")[1] elif re.match(r"^\s*\d+", line): parts = re.split(r"\s{2,}", line.strip(), maxsplit=4) if len(parts) == 5 and "iB" in parts[1] and "iB" in parts[2]: table["rows"].append({ "line": int(parts[0]), "mem_usage": parts[1], "increment": parts[2], "occurrences": int(parts[3]), "line_contents": parts[4], }) else: parts = re.split(r"\s{2,}", line.strip(), maxsplit=1) table["rows"].append({ "line": int(parts[0]), "line_contents": parts[1] if len(parts) == 2 else "", }) return table def print_averaged_results(profile_log: str, precision: int = 1): tables = [parse_profile_table(table) for table in profile_log.split("\n\n\n")] averaged_table = defaultdict(lambda: defaultdict(list)) for table in tables: filename = table["filename"] for row in table["rows"]: line = row["line"] if "mem_usage" in row: mem_usage = float(row["mem_usage"].split()[0]) increment = float(row["increment"].split()[0]) occurrences = row["occurrences"] averaged_table[filename][line].append((mem_usage, increment, occurrences)) else: averaged_table[filename][line].append(tuple()) stream = sys.stdout template = '{0:>6} {1:>12} {2:>12} {3:>10} {4:<}' for filename, lines in averaged_table.items(): header = template.format('Line #', 'Mem usage', 'Increment', 'Occurrences', 'Line Contents') stream.write(u'Filename: ' + filename + '\n\n') stream.write(header + u'\n') stream.write(u'=' * len(header) + '\n') all_lines = linecache.getlines(filename) float_format = u'{0}.{1}f'.format(precision + 4, precision) template_mem = u'{0:' + float_format + '} MiB' for lineno, mem_values in lines.items(): # TODO: should average the rest or not? # mem_values = [(50.1, 0.0, 4), (51.1, 0.0, 6), ()] if any([len(m) == 0 for m in mem_values]): tmp = template.format(lineno, "", "", "", all_lines[lineno - 1]) else: mem_usage_sum = sum(m[0] for m in mem_values) increment_sum = sum(m[1] for m in mem_values) occurrences_sum = sum(m[2] for m in mem_values) count = len(mem_values) avg_mem_usage = mem_usage_sum / count avg_increment = increment_sum / count avg_occurrences = occurrences_sum / count avg_mem_usage_str = template_mem.format(avg_mem_usage) avg_increment_str = template_mem.format(avg_increment) tmp = template.format(lineno, avg_mem_usage_str, avg_increment_str, int(avg_occurrences), all_lines[lineno - 1]) stream.write(tmp) print_averaged_results(profile_stream.getvalue(), precision=PROFILE_PRECISION) """ memory_profiler_pkgs = r""" from collections import defaultdict, deque from memory_profiler import profile import io profile_stream = io.StringIO() PROFILE_PRECISION = 1 import re import sys import linecache """ def calculate_memory_usage(dat_file_path): with open(dat_file_path, 'r') as file: prev_time = 0 prev_mem_mb = 0 mem_time_mb_s = 0 next(file) for line in file: if not line.startswith('MEM'): continue # Skip any line that does not start with 'MEM' parts = line.split() mem_in_mb = float(parts[1]) timestamp = float(parts[2]) if prev_time > 0: time_interval_s = timestamp - prev_time mem_time_mb_s += (prev_mem_mb + mem_in_mb) / 2 * time_interval_s prev_time = timestamp prev_mem_mb = mem_in_mb return mem_time_mb_s def calculate_runtime(dat_file_path): with open(dat_file_path, 'r') as file: start_time = float("inf") end_time = float("-inf") next(file) for line in file: if not line.startswith('MEM'): continue # Skip any line that does not start with 'MEM' parts = line.split() timestamp = float(parts[2]) start_time = min(start_time, timestamp) end_time = max(end_time, timestamp) return max(end_time - start_time,0) def report_max_memory_usage(dat_file_path): max_memory_usage = 0 with open(dat_file_path, 'r') as file: prev_time = 0 prev_mem_mb = 0 mem_time_mb_s = 0 next(file) for line in file: if not line.startswith('MEM'): continue # Skip any line that does not start with 'MEM' parts = line.split() mem_in_mb = float(parts[1]) max_memory_usage = max(max_memory_usage, mem_in_mb) return max_memory_usage def add_profile_decorator_to_python_file(file_path,entry_point): """给Python文件中的函数自动添加@profile装饰器。""" try: with open(file_path, 'r') as file: lines = file.readlines() if "humaneval" in file_path: with open(file_path, 'w') as file: inside_class = False class_indent = 0 for line in lines: stripped_line = line.lstrip() if stripped_line.startswith(f"def {entry_point}"): inside_class = True class_indent = len(line) - len(stripped_line) file.write('@profile\n') file.write(line) continue if inside_class: if stripped_line and not line[class_indent].isspace(): inside_class = False elif stripped_line.startswith("def "): file.write(' ' * class_indent + '@profile\n') file.write(line) if "mbpp" in file_path: entry_point with open(file_path, 'w') as file: inside_class = False class_indent = 0 for line in lines: stripped_line = line.lstrip() if stripped_line.startswith(f"def {entry_point}"): inside_class = True class_indent = len(line) - len(stripped_line) file.write('@profile\n') file.write(line) continue if inside_class: if stripped_line and not line[class_indent].isspace(): inside_class = False elif stripped_line.startswith("def "): file.write(' ' * class_indent + '@profile\n') file.write(line) else: with open(file_path, 'w') as file: inside_class = False class_indent = 0 for line in lines: stripped_line = line.lstrip() if stripped_line.startswith("class Solution"): inside_class = True class_indent = len(line) - len(stripped_line) file.write(line) continue if inside_class: if stripped_line and not line[class_indent].isspace(): inside_class = False elif stripped_line.startswith("def "): file.write(' ' * class_indent + ' @profile\n') file.write(line) except Exception as e: # print(f"Error during the file processing: {e}") pass def add_profile_for_memory_profiler(code_string,data): """给Python代码中的函数自动添加@profile装饰器。""" entry_point = "" try: if "task_id" in data.keys() and "HumanEval" in data["task_id"]: entry_point = data["entry_point"] lines = code_string.split('\n') new_lines = [] inside_class = False class_indent = 0 first_function = True for line in lines: stripped_line = line.lstrip() if stripped_line.startswith(f"def {entry_point}"): inside_class = True class_indent = len(line) - len(stripped_line) new_lines.append(' ' * class_indent + '@profile(stream=profile_stream, precision=PROFILE_PRECISION)') new_lines.append(line) return '\n'.join(new_lines) elif "task_id" in data.keys(): entry_point = data["entry_point"] lines = code_string.split('\n') new_lines = [] inside_class = False class_indent = 0 first_function = True for line in lines: stripped_line = line.lstrip() if stripped_line.startswith(f"def {entry_point}"): inside_class = True class_indent = len(line) - len(stripped_line) new_lines.append(' ' * class_indent + '@profile(stream=profile_stream, precision=PROFILE_PRECISION)') new_lines.append(line) return '\n'.join(new_lines) else: lines = code_string.split('\n') new_lines = [] inside_class = False class_indent = 0 first_function = True for line in lines: stripped_line = line.lstrip() if stripped_line.startswith("class Solution"): inside_class = True class_indent = len(line) - len(stripped_line) new_lines.append(line) continue if inside_class: if stripped_line and not line[class_indent].isspace(): inside_class = False elif stripped_line.startswith("def ") and first_function: new_lines.append(' ' * class_indent + ' @profile(stream=profile_stream, precision=PROFILE_PRECISION)') first_function = False new_lines.append(line) return '\n'.join(new_lines) except Exception as e: return code_string def calculate_line_efficiency(completion_file,entry_point): try: path, filename = os.path.split(completion_file) tmp_py_script_filename = f"{filename.split('.')[0]}_tmp.py" tmp_py_script = os.path.join(path, tmp_py_script_filename) tmp_lprof_filename = f"{tmp_py_script_filename}.lprof" # 期望的lprof文件名 # 复制原始脚本到临时文件,并添加@profile装饰器 subprocess.run(['cp', completion_file, tmp_py_script],check=True, capture_output=True, text=True) add_profile_decorator_to_python_file(tmp_py_script,entry_point) subprocess.run(['timeout',"10",'kernprof', '-l', tmp_py_script_filename], cwd=path, capture_output=True, text=True, check=True) # 生成性能报告 overhead_dir = path # os.makedirs(overhead_dir, exist_ok=True) report_file = os.path.join(overhead_dir, tmp_py_script_filename.replace('.py', '.txt')) with open(report_file, 'w') as f: subprocess.run(['timeout',"10",'python', '-m', 'line_profiler', tmp_lprof_filename], cwd=path, stdout=f) with open(report_file, 'r') as f: report_content = f.read() # print(report_content) except subprocess.CalledProcessError as e: # print(f"Error during the execution: {e}") report_content = f"Error during the execution: {e}" # # 清理临时文件 if os.path.exists(tmp_py_script): os.remove(tmp_py_script) if os.path.exists(f"{tmp_py_script}.lprof"): os.remove(f"{tmp_py_script}.lprof") return report_content def humaneval_add_string_to_py_file(data,evaluation_code=False, path="./tmp/"): if "canonical_solution" in path: data["completion"] = data["canonical_solution"] if evaluation_code==False: test_case = data["test"] else: test_case = data["small_test_cases"] # test_case = data["small_test_cases"] problem_idx = data["task_id"].split("/")[1] return_path,full_code = None,"" tmp_code = data["completion"].split("\n") code = [] for string in tmp_code: if "print(" in string: continue else: code.append(string) data["completion"] = "\n".join(code) try: if f"```python" in data["completion"]: start_idx = data["completion"].find(f"```python") data["completion"] = data["completion"][start_idx+len(f"```python"):] if "```" in data["completion"]: end_idx = data["completion"].find("```") data["completion"] = data["completion"][:end_idx] full_code = import_pkg+ "\n"+data["prompt"] + "\n"+data["completion"] + "\n" + test_case # with open(f"./{path}/{problem_idx}.py", "w") as f: # f.write(full_code) # return_path = f"./{path}/{problem_idx}.py" result = check_correctness(full_code,timeout=10.0) if result["passed"]: with open(f"./{path}/{problem_idx}.py", "w") as f: f.write(full_code) return_path = f"./{path}/{problem_idx}.py" # print(return_path) else: return_path = None except Exception as e: pass # print(return_path,full_code) return return_path,full_code def mbpp_add_string_to_py_file(data,evaluation_code=False, path="./tmp/"): if "canonical_solution" in path: data["completion"] = data["code"] if evaluation_code==False: test_case = data["test"] else: test_case = "\n".join(data["test_list"]) # test_case = data["small_test_cases"] problem_idx = str(data["task_id"]) return_path,full_code = None,"" tmp_code = data["completion"].split("\n") code = [] for string in tmp_code: if "print(" in string: continue else: code.append(string) data["completion"] = "\n".join(code) try: if f"```python" in data["completion"]: start_idx = data["completion"].find(f"```python") data["completion"] = data["completion"][start_idx+len(f"```python"):] if "```" in data["completion"]: end_idx = data["completion"].find("```") data["completion"] = data["completion"][:end_idx] full_code = "\n".join(data["test_imports"])+ "\n"+data["completion"] + "\n" + test_case # with open(f"./{path}/{problem_idx}.py", "w") as f: # f.write(full_code) # return_path = f"./{path}/{problem_idx}.py" result = check_correctness(full_code,timeout=10.0) if result["passed"]: with open(f"./{path}/{problem_idx}.py", "w") as f: f.write(full_code) return_path = f"./{path}/{problem_idx}.py" except Exception as e: # print(e) pass # print(return_path,full_code) return return_path,full_code def add_string_to_py_file(data,evaluation_code=False, path="./tmp/"): if "canonical_solution" in path: data["completion"] = data["canonical_solution"] if evaluation_code==False: test_case = data["test_case"] else: test_case = data["small_test_cases"] # test_case = data["small_test_cases"] problem_idx = data["problem_idx"] return_path,full_code = None,"" tmp_code = data["completion"].split("\n") code = [] for string in tmp_code: if "print(" in string: continue else: code.append(string) data["completion"] = "\n".join(code) try: if "class Solution" in data["completion"]: if "```python" in data["completion"]: start_idx = data["completion"].find("```python") data["completion"] = data["completion"][start_idx+9:] if "```" in data["completion"]: end_idx = data["completion"].find("```") data["completion"] = data["completion"][:end_idx] test_case = test_case.split("\n")[:100] test_case = "\n".join(test_case) # import_pkg full_code = import_pkg + "\n"+TreeNode_text + "\n"+ListNode_text + "\n" + data["completion"] + "\nsolution=Solution()\n" + test_case # with open(f"./{path}/{problem_idx}.py", "w") as f: # f.write(full_code) # return_path = f"./{path}/{problem_idx}.py" result = check_correctness(full_code,timeout=10.0) if result["passed"]: with open(f"./{path}/{problem_idx}.py", "w") as f: f.write(full_code) return_path = f"./{path}/{problem_idx}.py" # print(return_path) else: return_path = None except Exception as e: # print(e) pass return return_path,full_code def calculate_code_execution_efficiency(data,evaluation_code=False,path="./tmp/",max_execution_time=10): entry_point = "" try: if "task_id" in data.keys() and "HumanEval" in str(data["task_id"]): problem_idx = data["task_id"].split("/")[1] completion_file,full_code = humaneval_add_string_to_py_file(data,evaluation_code=evaluation_code, path=path) entry_point = data["entry_point"] # print(data.keys()) # print(data["dataset"]) elif "dataset" in data.keys() and data["dataset"]=="mbpp": problem_idx = data["task_id"] completion_file,full_code = mbpp_add_string_to_py_file(data,evaluation_code=evaluation_code, path=path) code_example = data["code"] match = re.search(r"def\s+(\w+)\s*\(", code_example) if match: entry_point = match.group(1) else: test_example = data["test_list"][0] match = re.search(r"assert\s+(\w+)\s*\(", test_example) if match: entry_point = match.group(1) else: completion_file== None else: problem_idx = data["problem_idx"] completion_file,full_code = add_string_to_py_file(data,evaluation_code=evaluation_code, path=path) except Exception as e: # print(e) completion_file = None if completion_file == None: # print("test") overhead = f""" The code execution failed. """ canonical_solution_memory_usage = 0 canonical_solution_execution_time = 0 canonical_solution_max_memory_usage = 0 executable = False return overhead, canonical_solution_memory_usage, canonical_solution_execution_time, canonical_solution_max_memory_usage, executable script_path = './run_code.sh' completion_dat_file = f'./{path}/{problem_idx}.dat' try: subprocess.run([script_path, completion_file, completion_dat_file,str(max_execution_time)], check=True, capture_output=True, text=True) canonical_solution_memory_usage = calculate_memory_usage(completion_dat_file) canonical_solution_execution_time = calculate_runtime(completion_dat_file) canonical_solution_max_memory_usage = report_max_memory_usage(completion_dat_file) executable = True overhead = f""" The total memory usage during the code execution is: {canonical_solution_memory_usage} MB*s. The total execution time is: {canonical_solution_execution_time} s. The maximum memory peak requirement is: {canonical_solution_max_memory_usage} MB. """ except Exception as e: # print(e) overhead = f""" The code execution failed. """ canonical_solution_memory_usage = 0 canonical_solution_execution_time = 0 canonical_solution_max_memory_usage = 0 executable = False return overhead, canonical_solution_memory_usage, canonical_solution_execution_time, canonical_solution_max_memory_usage, executable def fetch_completion(dataset,model): with ThreadPoolExecutor() as executor: future_to_entry = {executor.submit(calculate_code_execution_efficiency, copy.deepcopy(entry),False, path=model,max_execution_time=10): entry for entry in tqdm(dataset)} for future in tqdm(concurrent.futures.as_completed(future_to_entry)): entry = future_to_entry[future] try: updated_entry = future.result() idx = dataset.index(entry) dataset[idx] = updated_entry except Exception as e: print(e) return dataset def run_model_task(task, model, file): if "/" in model: model = model.split("/")[1] dat_path = f"./results/{task}_{model}" canonical_solution_path = f"./results/{task}_canonical_solution" with open(file, "r") as f: dataset = json.load(f) if os.path.exists(dat_path): shutil.rmtree(dat_path) if os.path.exists(canonical_solution_path): shutil.rmtree(canonical_solution_path) if os.path.exists(dat_path) == False: os.makedirs(dat_path) if os.path.exists(canonical_solution_path) == False: os.makedirs(canonical_solution_path) fetch_completion(dataset,dat_path) with open(file, "r") as f: dataset = json.load(f) for i in range(len(dataset)): dataset[i]["dataset"] = f"{task}" fetch_completion(dataset,canonical_solution_path) if __name__ == "__main__": parse = argparse.ArgumentParser() parse.add_argument("--task", type=str, default="EffiBench") parse.add_argument("--model", type=str, default="gpt-4") parse.add_argument("--file", type=str, default="") args = parse.parse_args() if not args.file: args.file = f"./{args.task}_{args.model}.json" run_model_task(args.task, args.model, args.file)