auto-info / all_code.py
rookiemango's picture
Upload folder using huggingface_hub
32b6f1a verified
raw
history blame
21.2 kB
from genericpath import sameopenfile
from os import remove
from random import random
import re
import tqdm
import json
import pdb
import io
import os
import statistics
import random;random.seed(42)
def filter_less_not_long_data():
training_data = []
with open("data/lean4_random/5k_first_filtered.json", "r") as f:
training_data += json.load(f)
with open("data/lean4_random/5k_second_filtered.json", "r") as f:
training_data += json.load(f)
with open("data/lean4_random/5k_third_filtered.json", "r") as f:
training_data += json.load(f)
# Filter and save filtered data
with open("data/lean4_random/15k_filtered.json", "w") as f:
handled_json_list = [item for item in training_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100]
print("Filtered all length:", len(handled_json_list))
json.dump(handled_json_list, f, ensure_ascii=False, indent=2)
with open("data/lean4_random/1k_test.json", "r") as f:
random_test_data = json.load(f)
handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100]
with open("data/lean4_random/1k_test_filtered.json", "w") as f:
print("Filtered all length:", len(handled_json_list))
json.dump(handled_json_list, f, ensure_ascii=False, indent=2)
with open("data/lean4_basic/1k_test_filtered.jsonl", "r") as f:
random_test_data = json.load(f)
handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100]
with open("data/lean4_basic/1k_test_filtered.jsonl", "w") as f:
print("Filtered all length:", len(handled_json_list))
json.dump(handled_json_list, f, ensure_ascii=False, indent=2)
def statistics_lean4_training_test():
import json
with open("data/lean4_random/1k_test.json", "r") as f:
test_data = json.load(f)
test_data = [item['statement_poof'][:-2] for item in test_data]
# Function to filter items based on existence in test data
def handle(json_list):
handled_json_list = [item for item in json_list if len(item['model_response']) > 400 and len(item['statement_poof']) > 100]
statement_proof_lengths = [len(item['statement_poof']) for item in handled_json_list]
model_output_lengths = [len(item['model_response']) for item in handled_json_list]
# Calculate statistics
statement_proof_stats = {
'mean': statistics.mean(statement_proof_lengths),
'median': statistics.median(statement_proof_lengths),
'min': min(statement_proof_lengths),
'max': max(statement_proof_lengths)
}
model_output_stats = {
'mean': statistics.mean(model_output_lengths),
'median': statistics.median(model_output_lengths),
'min': min(model_output_lengths),
'max': max(model_output_lengths)
}
# Print results
print(f"total length is {len(handled_json_list)}")
print(f"Statistics for 'statement_proof':\n {json.dumps(statement_proof_stats, indent=4)}")
print(f"\nStatistics for 'model_output':\n {json.dumps(model_output_stats, indent=4)}")
# Initialize minimum length to a large value and the hold strings
min_sp_len = float('inf')
min_sp_str = ""
min_mo_len = float('inf')
min_mo_str = ""
# Extract lengths of values and find min len strings
statement_proof_lengths = []
model_output_lengths = []
for item in handled_json_list:
sp_len = len(item['statement_poof'])
mo_len = len(item['model_response'])
statement_proof_lengths.append(sp_len)
model_output_lengths.append(mo_len)
if sp_len < min_sp_len:
min_sp_len = sp_len
min_sp_str = item['statement_poof']
if mo_len < min_mo_len:
min_mo_len = mo_len
min_mo_str = item['model_response']
print(min_sp_str)
print(min_mo_str)
# Filter and save filtered data
# training_data = []
# with open("data/lean4_random/5k_first_filtered.json", "r") as f:
# training_data += json.load(f)
# with open("data/lean4_random/5k_second_filtered.json", "r") as f:
# training_data += json.load(f)
# with open("data/lean4_random/5k_third_filtered.json", "r") as f:
# training_data += json.load(f)
# Filter and save filtered data
# print(f"statisticals for training data")
# handle(training_data)
# with open("data/lean4_random/15k_filtered.json", "w") as f:
# handled_json_list = [item for item in training_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100]
# print("Filtered all length:", len(handled_json_list))
# json.dump(training_data, f, ensure_ascii=False, indent=2)
# print("Filtered first file length:", len(filtered_second_5k))
# with open("data/lean4_random/1k_test.json", "r") as f:
# random_test_data = json.load(f)
# with open("data/lean4_random/1k_test_filtered.json", "w") as f:
# print("Filtered all length:", len(training_data))
# json.dump(random_test_data, f, ensure_ascii=False, indent=2)
# print(f"statisticals for random test data")
# handle(random_test_data)
PROMPT_DICT = {
"wild": (
"Statement and proof in natural language:\n\n"
"# Problem:\n{question}\n\n"
"# Proof:\n{answer}\n\n"
"Translate the statement and proof in natural language to lean4:"
),
"lean4": (
"Statement and proof in natural language:\n\n"
"{model_response}\n\n"
"Translate the statement and proof in natural language to lean4:"
),
"prompt_no_input": (
"Below is an instruction that describes a task. "
"Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Response:"
),
}
with open("data/wild/wild_sample1k.jsonl", "r") as f:
basic_test_data = json.load(f)
basic_list = []
for item in basic_test_data:
list_item = {
"model_response": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']),
"statement_poof": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']),
}
basic_list.append(list_item)
# print(f"statisticals for basic test data")
handle(basic_list)
def filtered():
import json
with open("data/lean4_basic/1k_test.jsonl", "r") as f:
test_data = json.load(f)
test_data = [item['statement_poof'] for item in test_data]
# Function to filter items based on existence in test data
def filter_items(data, test_data):
filtered_data = [item for item in tqdm.tqdm(data) if item['statement_poof'][:-2] not in test_data]
return filtered_data
# Filter and save filtered data
# with open("data/lean4_random/5k_first.json", "r") as f:
# second_5k = json.load(f)
# filtered_second_5k = filter_items(second_5k, test_data)
# with open("data/lean4_random/5k_first_filtered.json", "w") as f:
# json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2)
# print("Filtered first file length:", len(filtered_second_5k))
# with open("data/lean4_random/5k_second.json", "r") as f:
# second_5k = json.load(f)
# filtered_second_5k = filter_items(second_5k, test_data)
# with open("data/lean4_random/5k_second_filtered.json", "w") as f:
# json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2)
# print("Filtered second file length:", len(filtered_second_5k))
# with open("data/lean4_random/5k_third.json", "r") as f:
# second_5k = json.load(f)
# filtered_second_5k = filter_items(second_5k, test_data)
# with open("data/lean4_random/5k_third_filtered.json", "w") as f:
# json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2)
# print("Filtered third file length:", len(filtered_second_5k))
with open("data/lean4_random/1k_test.json", "r") as f:
second_5k = json.load(f)
filtered_second_5k = filter_items(second_5k, test_data)
with open("data/lean4_random/1k_test_filtered.json", "w") as f:
json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2)
print("Filtered first file length:", len(filtered_second_5k))
def insert_label_for_autoformalization():
input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"]
for input_file in input_lists:
with open(input_file, "r") as f:
test_data = json.load(f)
for item in test_data:
item['task']='statement_form'
with open("data/lean4_statement_translate/15k_state_problem_translation_statement_form.json", "w") as f:
json.dump(test_data, f, ensure_ascii=False, indent=2)
input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"]
for input_file in input_lists:
with open(input_file, "r") as f:
test_data = json.load(f)
for item in test_data:
item['task']='statementproof_inform'
with open("data/lean4_statement_translate/15k_state_problem_translation_statementproof_inform.json", "w") as f:
json.dump(test_data, f, ensure_ascii=False, indent=2)
input_lists = ["all_theorem.jsonl"]
for input_file in input_lists:
with open(input_file, "r") as f:
test_data = json.load(f)
for item in test_data:
item['task']='solver'
with open("data/all_theorem_solver.jsonl", "w") as f:
json.dump(test_data, f, ensure_ascii=False, indent=2)
def _make_r_io_base(f, mode: str):
if not isinstance(f, io.IOBase):
f = open(f, mode=mode)
return f
PROMPT_DICT = {
"lean4": (
"Statement and proof in natural language:\n\n"
"{statement_text}\n\n"
"Translate the statement and proof in natural language to lean4:"
),
"prompt_no_input": (
"Below is an instruction that describes a task. "
"Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Response:"
),
}
def jload(f, mode="r"):
"""Load a .json file into a dictionary."""
f = _make_r_io_base(f, mode)
jdict = json.load(f)
f.close()
return jdict
def show_example():
item = 'data/lean4_random/5k_first.json'
list_data_dict = []
try:
list_data_dict += jload(item)
except BaseException:
with open(item, 'r') as f:
lines = f.readlines()
list_data_dict += [json.loads(line.strip()) for line in lines]
# list_data_dict = random.sample(list_data_dict, len(list_data_dict))
prompt_lean4 = PROMPT_DICT["lean4"]
list_data_dict = [{'instruction':prompt_lean4.format(statement_text = data['model_response']), 'input':'', 'output':data['statement_poof']} for data in list_data_dict]
for item in list_data_dict:
if "odd_iff" in item['output']:
# if len(item['output']) > 150:
print("input\n", item['instruction'])
print('*'*10)
print("out\n", item['output'])
pdb.set_trace()
def handle_lean4_compiler():
def filter_json(json_data):
filtered_data = {}
for key in json_data.keys():
if key in ['question', 'answer', 'total output', 'results']:
filtered_data[key] = json_data[key]
return filtered_data
input_file = '../repl/pass_rate_results/lean4_15k_train_output.json'
output_file = '../repl/pass_rate_results/lean4_15k_train_output_filter.json'
with open(input_file, "r") as f:
test_data = json.load(f)
json_list = test_data['results']
for id in range(len(json_list)):
json_list[id] = filter_json(json_list[id])
test_data['results'] = json_list
with open(f"{output_file}", 'w') as f:
json.dump(test_data, f,indent=2, ensure_ascii=False)
def locate(location=14):
from transformers import AutoTokenizer
# Example string
text = "Hello, how are you?"
# Initialize the tokenizer
tokenizer = AutoTokenizer.from_pretrained("../models/Mistral-7B-v0.1/")
# Tokenize the string
tokens = tokenizer("Hello, how are you?", padding=False, add_special_tokens=False).input_ids
print(tokens)
tokens = tokenizer(" Hello, how are you?", padding=False, add_special_tokens=False).input_ids
print(tokens)
# partial_tokens = tokenizer(text[:location].split()[-1], padding=False, add_special_tokens=False).input_ids
# def find_subarray(lst, sub_lst):
# for i in range(len(lst) - len(sub_lst) + 1):
# if lst[i:i+len(sub_lst)] == sub_lst:
# return i+len(sub_lst)
# assert True
# print(find_subarray(tokens, partial_tokens))
def handle_jsonfile(json_list):
def get_label(result,output):
if result['status'] == 'pass':
return -1
else:
return -2
assert True
all_results = []
idx = 0
for item in json_list:
dict_item = {
"idx": idx,
"question": item['question'],
"input": item['question'],
"ground_truth_cot": item['answer'],
"ground_truth":item['answer'],
"outputs": [
{
"response": item['total output'][id].split("#align")[0],
"label": get_label(item['results'][id], item['total output'][id].split("#align")[0]),
}
for id in range(len(item['results']))
],
}
idx += 1
all_results.append(dict_item)
return all_results
def read_jsonl(path: str):
try:
with open(path) as fh:
return [json.loads(line) for line in fh.readlines() if line]
except:
return json.load(open(path, 'r', encoding= 'utf-8'))
def ensure_parent_dir_exists(file_path):
# Extract the directory part of the file path
parent_dir = os.path.dirname(file_path)
# Check if the directory exists, and create it if it doesn't
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
print(f"Created directory: {parent_dir}")
else:
print(f"Directory already exists: {parent_dir}")
def save_jsonl(data, path):
ensure_parent_dir_exists(path)
with open(path, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def get_model_solutions_easy(data_dir):
import pdb
examples = []
for dd in data_dir.split(","):
examples += read_jsonl(dd)['results']
examples = handle_jsonfile(examples)
print(f"{len(examples)} examples, each with {len(examples[0]['outputs'])} solutions")
pass1 = 0
pass5 = 0
for item in examples:
pass1 += item['outputs'][0]['label'] == -1
for output in item['outputs']:
if output['label'] == -1:
pass5 += 1
break
print(pass1/len(examples))
print(pass5/len(examples))
return examples
def get_q_a(text):
# Extract the content after "# Question:\n"
question = re.search(r"# Question:\n(.*?)\n\n# Answer:", text, re.S)
if question:
question_content = question.group(1).strip()
# print("Question Content:")
# print(question_content)
# Extract the content after "# Answer:\n"
answer = re.search(r"# Answer:\n(.*?)(\n\n|$)", text, re.S)
if answer:
answer_content = answer.group(1).strip()
# print("\nAnswer Content:")
# print(answer_content)
return question_content, answer_content
def from_compiler_to_lean4(data_dir):
# examples = []
# for dd in data_dir.split(","):
# examples += read_jsonl(dd)['results']
# sample_examples = random.sample(examples, 1000)
# # Save results to a JSON file
# with open(f'sampled_compilation_results/sample_1k.json', 'w') as f:
# json.dump(sample_examples, f, indent=2, ensure_ascii=False)
examples = read_jsonl(data_dir)
pass1 = 0
passk = 0
save_data_autoform = []
save_data_qa = []
# examples = handle_jsonfile(examples)
for id in range(len(examples['results'])):
result = examples['results'][id]
pass1 += 1 if result['results'][0]['status'] == 'pass' else 0
flag = False
for idx , item in enumerate(result['results']):
if item['status'] == 'pass':
q,a = get_q_a(result['question'])
save_data_autoform.append(
{
"informal_question": q,
"informal_answer": a,
"formal_statement_proof": result['total output'][idx],
"task": "autoform"
}
)
save_data_qa.append(
{
"informal_question": q,
"informal_answer": a,
"task": "qa"
}
)
if not flag:
passk += 1
flag = True
print(pass1/len(examples['results']))
print(passk/len(examples['results']))
print("len", len(save_data_autoform), len(examples['results']))
save_jsonl(data = save_data_autoform, path= "autoform_data/math_train/g-mistral-qa-gsm8kmath-autoform-forml4-d-mathtrain-t-autoform.jsonl")
# save_jsonl(data = save_data_autoform, path= "autoform_data/math_train/g-mistral-qa-gsm8kmath-autoform-forml4-d-mathtrain-t-autoform.jsonl")
def show_example_compilation_results(data_dir):
# examples = []
# for dd in data_dir.split(","):
# examples += read_jsonl(dd)['results']
# sample_examples = random.sample(examples, 1000)
# # Save results to a JSON file
# with open(f'sampled_compilation_results/sample_1k.json', 'w') as f:
# json.dump(sample_examples, f, indent=2, ensure_ascii=False)
item = read_jsonl(data_dir)
# examples = handle_jsonfile(examples)
for id in range(len(item['results'])):
result = item['results'][id]
for item in result['results']:
if item['status'] == 'pass':
print( result['total output'][id])
# print(result['cmd'][id].split('\n\n')[-1])
# if item['results'][id]['status'] == 'pass':
# print(item['results'][id]['stdout']['tactics'])
# print( item['total output'][id])
# pdb.set_trace()
# print(item['results'][id]['stdout']['messages'])
# print(item['results'][id]['stdout']['env'])
# print(f"{len(examples)} examples, each with {len(examples[0]['outputs'])} solutions")
# pass1 = 0
# pass5 = 0
# for item in examples:
# pass1 += item['outputs'][0]['label'] == -1
# for output in item['outputs']:
# if output['label'] == -1:
# pass5 += 1
# break
# print(pass1/len(examples))
# print(pass5/len(examples))
return examples
def get_novel_premises():
training_data = json.load(open("data/lean4_random/15k_filtered.json", 'r', encoding='utf8'))
random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8'))
all_data = training_data + random_data
import pdb
pdb.set_trace()
def remove_align():
random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8'))
for item in random_data:
input = item['model_response']
def handle_math():
input = 'data/test/gsm8k/train.jsonl'
random_data = json.load(open(input, 'r', encoding='utf8'))
new_data = []
for item in random_data:
new_data.append(
{
'informal_question': item['question'],
'informal_answer':item['answer'],
'task' : 'qa',
}
)
save_path = 'autoform_data/qa_gsm8k_train.json'
print(f"len of {len(new_data)} is saved")
with open(save_path, "w") as json_file:
json.dump(new_data, json_file, indent=4 , ensure_ascii= False)
if __name__ == '__main__':
from_compiler_to_lean4("../repl/pass_rate_results/math_train/1/10pass10.jsonl")
# handle_math()
# remove_align()
# get_novel_premises()
# show_example_compilation_results(data_dir = '../repl/pass_rate_results/gpt_result/gpt4_wild.json')
# locate()
# handle_lean4_compiler()
# show_example()
# insert_label_for_autoformalization()
# filtered()
# statistics_lean4_training_test()
# filter_less_not_long_data()