|
from genericpath import sameopenfile |
|
from os import remove |
|
from random import random |
|
import re |
|
import tqdm |
|
import json |
|
import pdb |
|
import io |
|
import os |
|
import statistics |
|
import random;random.seed(42) |
|
|
|
|
|
def filter_less_not_long_data(): |
|
training_data = [] |
|
|
|
with open("data/lean4_random/5k_first_filtered.json", "r") as f: |
|
training_data += json.load(f) |
|
|
|
with open("data/lean4_random/5k_second_filtered.json", "r") as f: |
|
training_data += json.load(f) |
|
|
|
with open("data/lean4_random/5k_third_filtered.json", "r") as f: |
|
training_data += json.load(f) |
|
|
|
|
|
|
|
|
|
with open("data/lean4_random/15k_filtered.json", "w") as f: |
|
handled_json_list = [item for item in training_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] |
|
print("Filtered all length:", len(handled_json_list)) |
|
json.dump(handled_json_list, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
with open("data/lean4_random/1k_test.json", "r") as f: |
|
random_test_data = json.load(f) |
|
handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] |
|
with open("data/lean4_random/1k_test_filtered.json", "w") as f: |
|
print("Filtered all length:", len(handled_json_list)) |
|
json.dump(handled_json_list, f, ensure_ascii=False, indent=2) |
|
|
|
with open("data/lean4_basic/1k_test_filtered.jsonl", "r") as f: |
|
random_test_data = json.load(f) |
|
handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] |
|
with open("data/lean4_basic/1k_test_filtered.jsonl", "w") as f: |
|
print("Filtered all length:", len(handled_json_list)) |
|
json.dump(handled_json_list, f, ensure_ascii=False, indent=2) |
|
|
|
def statistics_lean4_training_test(): |
|
import json |
|
with open("data/lean4_random/1k_test.json", "r") as f: |
|
test_data = json.load(f) |
|
test_data = [item['statement_poof'][:-2] for item in test_data] |
|
|
|
|
|
def handle(json_list): |
|
handled_json_list = [item for item in json_list if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] |
|
statement_proof_lengths = [len(item['statement_poof']) for item in handled_json_list] |
|
model_output_lengths = [len(item['model_response']) for item in handled_json_list] |
|
|
|
|
|
statement_proof_stats = { |
|
'mean': statistics.mean(statement_proof_lengths), |
|
'median': statistics.median(statement_proof_lengths), |
|
'min': min(statement_proof_lengths), |
|
'max': max(statement_proof_lengths) |
|
} |
|
|
|
model_output_stats = { |
|
'mean': statistics.mean(model_output_lengths), |
|
'median': statistics.median(model_output_lengths), |
|
'min': min(model_output_lengths), |
|
'max': max(model_output_lengths) |
|
} |
|
|
|
|
|
print(f"total length is {len(handled_json_list)}") |
|
print(f"Statistics for 'statement_proof':\n {json.dumps(statement_proof_stats, indent=4)}") |
|
print(f"\nStatistics for 'model_output':\n {json.dumps(model_output_stats, indent=4)}") |
|
|
|
|
|
min_sp_len = float('inf') |
|
min_sp_str = "" |
|
min_mo_len = float('inf') |
|
min_mo_str = "" |
|
|
|
|
|
statement_proof_lengths = [] |
|
model_output_lengths = [] |
|
|
|
for item in handled_json_list: |
|
sp_len = len(item['statement_poof']) |
|
mo_len = len(item['model_response']) |
|
statement_proof_lengths.append(sp_len) |
|
model_output_lengths.append(mo_len) |
|
|
|
if sp_len < min_sp_len: |
|
min_sp_len = sp_len |
|
min_sp_str = item['statement_poof'] |
|
|
|
if mo_len < min_mo_len: |
|
min_mo_len = mo_len |
|
min_mo_str = item['model_response'] |
|
|
|
print(min_sp_str) |
|
print(min_mo_str) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PROMPT_DICT = { |
|
"wild": ( |
|
"Statement and proof in natural language:\n\n" |
|
"# Problem:\n{question}\n\n" |
|
"# Proof:\n{answer}\n\n" |
|
"Translate the statement and proof in natural language to lean4:" |
|
), |
|
"lean4": ( |
|
"Statement and proof in natural language:\n\n" |
|
"{model_response}\n\n" |
|
"Translate the statement and proof in natural language to lean4:" |
|
), |
|
"prompt_no_input": ( |
|
"Below is an instruction that describes a task. " |
|
"Write a response that appropriately completes the request.\n\n" |
|
"### Instruction:\n{instruction}\n\n### Response:" |
|
), |
|
} |
|
with open("data/wild/wild_sample1k.jsonl", "r") as f: |
|
basic_test_data = json.load(f) |
|
basic_list = [] |
|
for item in basic_test_data: |
|
list_item = { |
|
"model_response": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']), |
|
"statement_poof": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']), |
|
} |
|
basic_list.append(list_item) |
|
|
|
handle(basic_list) |
|
|
|
def filtered(): |
|
import json |
|
with open("data/lean4_basic/1k_test.jsonl", "r") as f: |
|
test_data = json.load(f) |
|
test_data = [item['statement_poof'] for item in test_data] |
|
|
|
|
|
|
|
|
|
|
|
def filter_items(data, test_data): |
|
filtered_data = [item for item in tqdm.tqdm(data) if item['statement_poof'][:-2] not in test_data] |
|
return filtered_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open("data/lean4_random/1k_test.json", "r") as f: |
|
second_5k = json.load(f) |
|
filtered_second_5k = filter_items(second_5k, test_data) |
|
with open("data/lean4_random/1k_test_filtered.json", "w") as f: |
|
json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2) |
|
print("Filtered first file length:", len(filtered_second_5k)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def insert_label_for_autoformalization(): |
|
input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"] |
|
for input_file in input_lists: |
|
with open(input_file, "r") as f: |
|
test_data = json.load(f) |
|
for item in test_data: |
|
item['task']='statement_form' |
|
|
|
with open("data/lean4_statement_translate/15k_state_problem_translation_statement_form.json", "w") as f: |
|
json.dump(test_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"] |
|
for input_file in input_lists: |
|
with open(input_file, "r") as f: |
|
test_data = json.load(f) |
|
for item in test_data: |
|
item['task']='statementproof_inform' |
|
with open("data/lean4_statement_translate/15k_state_problem_translation_statementproof_inform.json", "w") as f: |
|
json.dump(test_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
input_lists = ["all_theorem.jsonl"] |
|
for input_file in input_lists: |
|
with open(input_file, "r") as f: |
|
test_data = json.load(f) |
|
for item in test_data: |
|
item['task']='solver' |
|
|
|
with open("data/all_theorem_solver.jsonl", "w") as f: |
|
json.dump(test_data, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
def _make_r_io_base(f, mode: str): |
|
if not isinstance(f, io.IOBase): |
|
f = open(f, mode=mode) |
|
return f |
|
|
|
PROMPT_DICT = { |
|
"lean4": ( |
|
"Statement and proof in natural language:\n\n" |
|
"{statement_text}\n\n" |
|
"Translate the statement and proof in natural language to lean4:" |
|
), |
|
"prompt_no_input": ( |
|
"Below is an instruction that describes a task. " |
|
"Write a response that appropriately completes the request.\n\n" |
|
"### Instruction:\n{instruction}\n\n### Response:" |
|
), |
|
} |
|
|
|
def jload(f, mode="r"): |
|
"""Load a .json file into a dictionary.""" |
|
f = _make_r_io_base(f, mode) |
|
jdict = json.load(f) |
|
f.close() |
|
return jdict |
|
|
|
def show_example(): |
|
item = 'data/lean4_random/5k_first.json' |
|
list_data_dict = [] |
|
try: |
|
list_data_dict += jload(item) |
|
except BaseException: |
|
with open(item, 'r') as f: |
|
lines = f.readlines() |
|
list_data_dict += [json.loads(line.strip()) for line in lines] |
|
|
|
|
|
|
|
prompt_lean4 = PROMPT_DICT["lean4"] |
|
list_data_dict = [{'instruction':prompt_lean4.format(statement_text = data['model_response']), 'input':'', 'output':data['statement_poof']} for data in list_data_dict] |
|
for item in list_data_dict: |
|
if "odd_iff" in item['output']: |
|
|
|
print("input\n", item['instruction']) |
|
print('*'*10) |
|
print("out\n", item['output']) |
|
pdb.set_trace() |
|
|
|
|
|
def handle_lean4_compiler(): |
|
def filter_json(json_data): |
|
filtered_data = {} |
|
for key in json_data.keys(): |
|
if key in ['question', 'answer', 'total output', 'results']: |
|
filtered_data[key] = json_data[key] |
|
return filtered_data |
|
input_file = '../repl/pass_rate_results/lean4_15k_train_output.json' |
|
output_file = '../repl/pass_rate_results/lean4_15k_train_output_filter.json' |
|
with open(input_file, "r") as f: |
|
test_data = json.load(f) |
|
json_list = test_data['results'] |
|
for id in range(len(json_list)): |
|
json_list[id] = filter_json(json_list[id]) |
|
test_data['results'] = json_list |
|
with open(f"{output_file}", 'w') as f: |
|
json.dump(test_data, f,indent=2, ensure_ascii=False) |
|
|
|
def locate(location=14): |
|
from transformers import AutoTokenizer |
|
|
|
|
|
text = "Hello, how are you?" |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("../models/Mistral-7B-v0.1/") |
|
|
|
|
|
tokens = tokenizer("Hello, how are you?", padding=False, add_special_tokens=False).input_ids |
|
print(tokens) |
|
tokens = tokenizer(" Hello, how are you?", padding=False, add_special_tokens=False).input_ids |
|
print(tokens) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_jsonfile(json_list): |
|
def get_label(result,output): |
|
if result['status'] == 'pass': |
|
return -1 |
|
else: |
|
return -2 |
|
assert True |
|
all_results = [] |
|
idx = 0 |
|
for item in json_list: |
|
dict_item = { |
|
"idx": idx, |
|
"question": item['question'], |
|
"input": item['question'], |
|
"ground_truth_cot": item['answer'], |
|
"ground_truth":item['answer'], |
|
"outputs": [ |
|
{ |
|
"response": item['total output'][id].split("#align")[0], |
|
"label": get_label(item['results'][id], item['total output'][id].split("#align")[0]), |
|
} |
|
for id in range(len(item['results'])) |
|
], |
|
} |
|
idx += 1 |
|
all_results.append(dict_item) |
|
return all_results |
|
|
|
def read_jsonl(path: str): |
|
|
|
try: |
|
with open(path) as fh: |
|
return [json.loads(line) for line in fh.readlines() if line] |
|
except: |
|
return json.load(open(path, 'r', encoding= 'utf-8')) |
|
|
|
|
|
|
|
def ensure_parent_dir_exists(file_path): |
|
|
|
parent_dir = os.path.dirname(file_path) |
|
|
|
|
|
if not os.path.exists(parent_dir): |
|
os.makedirs(parent_dir) |
|
print(f"Created directory: {parent_dir}") |
|
else: |
|
print(f"Directory already exists: {parent_dir}") |
|
def save_jsonl(data, path): |
|
ensure_parent_dir_exists(path) |
|
with open(path, 'w') as f: |
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
def get_model_solutions_easy(data_dir): |
|
import pdb |
|
|
|
examples = [] |
|
for dd in data_dir.split(","): |
|
examples += read_jsonl(dd)['results'] |
|
examples = handle_jsonfile(examples) |
|
print(f"{len(examples)} examples, each with {len(examples[0]['outputs'])} solutions") |
|
pass1 = 0 |
|
pass5 = 0 |
|
for item in examples: |
|
pass1 += item['outputs'][0]['label'] == -1 |
|
for output in item['outputs']: |
|
if output['label'] == -1: |
|
pass5 += 1 |
|
break |
|
print(pass1/len(examples)) |
|
print(pass5/len(examples)) |
|
|
|
return examples |
|
|
|
def get_q_a(text): |
|
|
|
question = re.search(r"# Question:\n(.*?)\n\n# Answer:", text, re.S) |
|
if question: |
|
question_content = question.group(1).strip() |
|
|
|
|
|
|
|
answer = re.search(r"# Answer:\n(.*?)(\n\n|$)", text, re.S) |
|
if answer: |
|
answer_content = answer.group(1).strip() |
|
|
|
|
|
return question_content, answer_content |
|
def from_compiler_to_lean4(data_dir): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
examples = read_jsonl(data_dir) |
|
pass1 = 0 |
|
passk = 0 |
|
save_data_autoform = [] |
|
save_data_qa = [] |
|
|
|
for id in range(len(examples['results'])): |
|
result = examples['results'][id] |
|
pass1 += 1 if result['results'][0]['status'] == 'pass' else 0 |
|
flag = False |
|
for idx , item in enumerate(result['results']): |
|
if item['status'] == 'pass': |
|
q,a = get_q_a(result['question']) |
|
save_data_autoform.append( |
|
{ |
|
"informal_question": q, |
|
"informal_answer": a, |
|
"formal_statement_proof": result['total output'][idx], |
|
"task": "autoform" |
|
} |
|
) |
|
save_data_qa.append( |
|
{ |
|
"informal_question": q, |
|
"informal_answer": a, |
|
"task": "qa" |
|
} |
|
) |
|
|
|
if not flag: |
|
passk += 1 |
|
flag = True |
|
|
|
print(pass1/len(examples['results'])) |
|
print(passk/len(examples['results'])) |
|
print("len", len(save_data_autoform), len(examples['results'])) |
|
save_jsonl(data = save_data_autoform, path= "autoform_data/math_train/g-mistral-qa-gsm8kmath-autoform-forml4-d-mathtrain-t-autoform.jsonl") |
|
|
|
|
|
|
|
def show_example_compilation_results(data_dir): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
item = read_jsonl(data_dir) |
|
|
|
for id in range(len(item['results'])): |
|
result = item['results'][id] |
|
for item in result['results']: |
|
if item['status'] == 'pass': |
|
print( result['total output'][id]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return examples |
|
|
|
def get_novel_premises(): |
|
training_data = json.load(open("data/lean4_random/15k_filtered.json", 'r', encoding='utf8')) |
|
random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8')) |
|
all_data = training_data + random_data |
|
import pdb |
|
pdb.set_trace() |
|
|
|
|
|
def remove_align(): |
|
random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8')) |
|
for item in random_data: |
|
input = item['model_response'] |
|
|
|
|
|
def handle_math(): |
|
input = 'data/test/gsm8k/train.jsonl' |
|
random_data = json.load(open(input, 'r', encoding='utf8')) |
|
new_data = [] |
|
for item in random_data: |
|
new_data.append( |
|
{ |
|
'informal_question': item['question'], |
|
'informal_answer':item['answer'], |
|
'task' : 'qa', |
|
} |
|
) |
|
|
|
save_path = 'autoform_data/qa_gsm8k_train.json' |
|
print(f"len of {len(new_data)} is saved") |
|
with open(save_path, "w") as json_file: |
|
json.dump(new_data, json_file, indent=4 , ensure_ascii= False) |
|
|
|
if __name__ == '__main__': |
|
from_compiler_to_lean4("../repl/pass_rate_results/math_train/1/10pass10.jsonl") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|