logical-reasoning / llm_toolkit /logical_reasoning_utils.py
inflaton's picture
openai zero-shot results
8b9bb19
raw
history blame
16.2 kB
import os
import re
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.ticker import MultipleLocator
from datasets import load_dataset
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
print(f"loading {__file__}")
P1 = """你是一个逻辑游戏的主持人。游戏规则如下:
1. 参与者会得到一个谜题。
2. 参与者可以通过提问来获取线索,尝试解开谜题。
3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。
4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。
5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。
请严格按照这些规则回答参与者提出的问题。
谜题: {}
实际情况: {}
参与者提出的问题: {}
"""
P1_en = """You are the host of a logic game. The rules of the game are as follows:
1. Participants will receive a puzzle.
2. Participants can ask questions to obtain clues and try to solve the puzzle.
3. For each question, the host will answer with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning.
4. The answer cannot include any additional information, nor can any word in the options be omitted. For example, “No” cannot be shortened to “N”.
5. Participants need to infer and ultimately find the correct answer to the puzzle based on the responses.
Please strictly adhere to these rules when answering participants’ questions.
Puzzle: {}
Actual situation: {}
Question from participants: {}"""
P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下:
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。
2. 主持人知道谜底,谜底是谜面的答案。
3. 参与者可以询问任何封闭式问题来找寻事件的真相。
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下:
- 若谜面和谜底能找到问题的答案,回答:是或者不是
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误
- 若参与者提问基本还原了谜底真相,回答:回答正确
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。
请严格按照这些规则回答参与者提出的问题。
**谜面:** {}
**谜底:** {}
**参与者提出的问题:** {}
"""
P2_en = """You are the host of a situational guessing game. The rules of the game are as follows:
1. Participants will receive a riddle that describes a simple yet difficult to understand event.
2. The host knows the answer, which is the solution to the riddle.
3. Participants can ask any closed-ended questions to uncover the truth of the event.
4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows:
- If the riddle and answer can provide an answer to the question, respond with: Yes or No
- If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant
- If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning
- If the participant's question essentially reveals the truth of the answer, respond with: Correct answer
5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N".
Please strictly follow these rules when answering the participant's questions.
**Riddle:** {}
**Answer:** {}
**Participant's question:** {}
"""
system_prompt = "You are an expert in logical reasoning."
def get_prompt_template(using_p1=True, chinese_prompt=True):
if using_p1:
return P1 if chinese_prompt else P1_en
else:
return P2 if chinese_prompt else P2_en
def extract_answer(text, debug=False):
if text and isinstance(text, str):
# Remove the begin and end tokens
text = re.sub(
r".*?(assistant|\[/INST\]).+?\b",
"",
text,
flags=re.DOTALL | re.MULTILINE,
)
if debug:
print("--------\nstep 1:", text)
text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE)
if debug:
print("--------\nstep 2:", text)
text = re.sub(
r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE
)
if debug:
print("--------\nstep 3:", text)
text = text.split(".")[0].strip()
text = text.split("。")[0].strip()
if debug:
print("--------\nstep 4:", text)
text = re.sub(
r"^Response:.+?\b",
"",
text,
flags=re.DOTALL | re.MULTILINE,
)
if debug:
print("--------\nstep 5:", text)
return text.strip()
return ""
def calc_metrics(references, predictions, debug=False):
assert len(references) == len(
predictions
), f"lengths are difference: {len(references)} != {len(predictions)}"
labels = np.unique(references)
valid_classifications = [1 if p in labels else 0 for p in predictions]
predictions = [extract_answer(text) for text in predictions]
accuracy = accuracy_score(references, predictions)
results = {"accuracy": accuracy}
if debug:
incorrect_ids = [i for i, p in enumerate(predictions) if p != references[i]]
results["incorrect_ids"] = incorrect_ids
precision = precision_score(
references, predictions, average="weighted", labels=labels
)
results["precision"] = float(precision)
recall = recall_score(references, predictions, average="weighted", labels=labels)
results["recall"] = float(recall)
f1 = f1_score(references, predictions, average="weighted", labels=labels)
results["f1"] = float(f1)
results["ratio_valid_classifications"] = sum(valid_classifications) / len(
valid_classifications
)
return results
def save_results(model_name, results_path, dataset, predictions, debug=False):
if not os.path.exists(results_path):
# Get the directory part of the file path
dir_path = os.path.dirname(results_path)
# Create all directories in the path (if they don't exist)
os.makedirs(dir_path, exist_ok=True)
if isinstance(dataset, pd.DataFrame):
df = dataset
else:
df = dataset.to_pandas()
df.drop(columns=["answer", "prompt", "train_text"], inplace=True, errors="ignore")
else:
df = pd.read_csv(results_path, on_bad_lines="warn")
df[model_name] = predictions
if debug:
print(df.head(1))
df.to_csv(results_path, index=False)
def load_logical_reasoning_dataset(
data_path, tokenizer=None, using_p1=True, chinese_prompt=True, test_data=None
):
postfix = "" if chinese_prompt else "_en"
train_data_file = data_path + f"/train{postfix}.csv"
test_data_file = data_path + f"/{test_data if test_data else 'dev'}{postfix}.csv"
print("loading train/test data files")
datasets = load_dataset(
"csv",
data_files={"train": train_data_file, "test": test_data_file},
)
if tokenizer:
reasoning_prompt = get_prompt_template(using_p1, chinese_prompt)
def formatting_prompts_func(examples):
inputs = examples["text"]
outputs = examples["label"]
puzzles = examples["puzzle"]
truths = examples["truth"]
messages = [
{
"role": "system",
"content": system_prompt,
},
None,
]
model_name = os.getenv("MODEL_NAME")
if "mistral" in model_name.lower() or "gemma" in model_name.lower():
messages = messages[1:]
texts = []
prompts = []
for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths):
prompt = reasoning_prompt.format(puzzle, truth, input)
messages[-1] = {"role": "user", "content": prompt}
prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
prompts.append(prompt)
texts.append(prompt + output + tokenizer.eos_token if output else "")
return {"train_text": texts, "prompt": prompts}
datasets = datasets.map(
formatting_prompts_func,
batched=True,
)
print(datasets)
return datasets
def get_metrics(df):
metrics_df = pd.DataFrame(df.columns.T)[2:]
metrics_df.rename(columns={0: "model"}, inplace=True)
metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1])
metrics_df.reset_index(inplace=True)
metrics_df = metrics_df.drop(columns=["index"])
accuracy = []
meteor = []
bleu_1 = []
rouge_l = []
all_metrics = []
for col in df.columns[2:]:
metrics = calc_metrics(df["label"], df[col], debug=True)
print(f"{col}: {metrics}")
accuracy.append(metrics["accuracy"])
all_metrics.append(metrics)
metrics_df["accuracy"] = accuracy
metrics_df["all_metrics"] = all_metrics
return metrics_df
def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False):
alpaca_data_path = (
"llama-factory/data/alpaca_mgtv_p1.json"
if using_p1
else "llama-factory/data/alpaca_mgtv_p2.json"
)
if use_english_datasets:
alpaca_data_path = alpaca_data_path.replace(".json", "_en.json")
if os.path.exists(alpaca_data_path):
print("loading existing data from:", alpaca_data_path)
data = pd.read_json(alpaca_data_path, orient="records", lines=False)
return data
print("loading new data from:", alpaca_data_path)
chinese_prompt = not use_english_datasets
datasets = load_logical_reasoning_dataset(
data_path, using_p1=using_p1, chinese_prompt=chinese_prompt
)
prompt_template = get_prompt_template(using_p1, chinese_prompt)
df_train = datasets["train"].to_pandas()
df_train["instruction"] = df_train.apply(
lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1
)
df_alpaca = pd.DataFrame(
{"instruction": [""] * len(df_train), "input": [""] * len(df_train)}
)
df_alpaca["instruction"] = df_train["instruction"]
df_alpaca["output"] = df_train["label"]
df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2)
return df_alpaca
def plot_value_counts(df, column_name, offset=0.1, title=None, preprocess_func=None):
font_family = rcParams["font.family"]
# Set the font to SimHei to support Chinese characters
rcParams["font.family"] = "STHeiti"
rcParams["axes.unicode_minus"] = (
False # This is to support the minus sign in Chinese.
)
if preprocess_func:
df["backup"] = df[column_name]
df[column_name] = df[column_name].apply(preprocess_func)
plt.figure(figsize=(12, 6))
df[column_name].value_counts().plot(kind="bar")
# add values on top of bars
for i, v in enumerate(df[column_name].value_counts()):
plt.text(i, v + offset, str(v), ha="center")
plt.xlabel(title or column_name)
plt.show()
rcParams["font.family"] = font_family
if preprocess_func:
df[column_name] = df["backup"]
df.drop(columns=["backup"], inplace=True)
def calc_metrics_for_col(df, col):
metrics = calc_metrics(df["label"], df[col], debug=True)
return metrics["accuracy"], metrics["precision"], metrics["recall"], metrics["f1"]
def get_metrics_df(df):
perf_df = pd.DataFrame(
columns=["epoch", "model", "accuracy", "precision", "recall", "f1"]
)
for i, col in enumerate(df.columns[5:]):
metrics = calc_metrics(df["label"], df[col], debug=False)
new_model_metrics = {
"epoch": i / 5,
"model": col,
}
new_model_metrics.update(metrics)
# Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame
perf_df = pd.concat(
[perf_df, pd.DataFrame([new_model_metrics])], ignore_index=True
)
return perf_df
def plot_metrics(perf_df, model_name):
fig, ax = plt.subplots(1, 1, figsize=(12, 6))
# Ensure the lengths of perf_df["epoch"], perf_df["accuracy"], and perf_df["f1"] are the same
min_length = min(
len(perf_df["epoch"]), len(perf_df["accuracy"]), len(perf_df["f1"])
)
perf_df = perf_df.iloc[:min_length]
# Plot accuracy and f1 on the same chart with different markers
ax.plot(perf_df["epoch"], perf_df["accuracy"], marker="o", label="Accuracy")
ax.plot(
perf_df["epoch"], perf_df["f1"], marker="s", label="F1 Score"
) # Square marker for F1 Score
# Add values on top of points
for i in range(min_length):
ax.annotate(
f"{perf_df['accuracy'].iloc[i]*100:.2f}%",
(perf_df["epoch"].iloc[i], perf_df["accuracy"].iloc[i]),
ha="center",
va="bottom", # Move accuracy numbers below the points
xytext=(0, -15),
textcoords="offset points",
fontsize=10,
)
ax.annotate(
f"{perf_df['f1'].iloc[i]*100:.2f}%",
(perf_df["epoch"].iloc[i], perf_df["f1"].iloc[i]),
ha="center",
va="top", # Move F1 score numbers above the points
xytext=(0, 15), # Offset by 15 points vertically
textcoords="offset points",
fontsize=10,
)
# Set y-axis limit
# ax.set_ylim(0.49, 0.825)
# Add title and labels
ax.set_xlabel("Epoch (0: base model, 0.2 - 2: fine-tuned models)")
ax.set_ylabel("Accuracy and F1 Score")
# Set x-axis grid spacing to 0.2
ax.xaxis.set_major_locator(MultipleLocator(0.2))
ax.set_title(f"Performance Analysis Across Checkpoints for the {model_name} Model")
# Rotate x labels
plt.xticks(rotation=0)
plt.grid(True)
# plt.tight_layout()
# Set legend at the right to avoid overlapping with lines
plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5))
plt.show()
def reasoning_with_openai(
row, user_prompt, max_tokens=None, model="gpt-4o-mini", base_url=None
):
llm = ChatOpenAI(
model=model,
temperature=0,
max_tokens=max_tokens,
timeout=None,
max_retries=2,
base_url=base_url,
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
system_prompt,
),
(
"human",
user_prompt.format(row["puzzle"], row["truth"], row["text"]),
),
]
)
chain = prompt | llm
response = chain.invoke(input={})
return response.content
def eval_openai(eval_dataset, model="gpt-4o-mini", max_new_tokens=300):
user_prompt = get_prompt_template(using_p1=False, chinese_prompt=True)
total = len(eval_dataset)
predictions = []
for i in tqdm(range(total)):
output = reasoning_with_openai(eval_dataset.iloc[i], user_prompt,model=model, max_tokens=max_new_tokens)
predictions.append(output)
return predictions