GreenPLM / pointllm /eval /evaluator_opensource_llm_QwenAPI.py
YuanTang96's picture
1
b30c1d8
import argparse
import json
import time
from http import HTTPStatus
from dashscope import Generation
from tqdm import tqdm
from multiprocessing import Pool
import random
from gradio_client import Client
from concurrent.futures import ThreadPoolExecutor
# from openai import OpenAI
random.seed(0)
import re
import os
MY_client = None
open_free_from_cls_prompt = """Analyze two sentences and determine if they're referring to the same general object or concept, only focusing on the type of object and category, not attributes such as color, size, or shape. Ignore 3D model-related adjectives such as "cartoon-style", "toy". Respond with 'T' if they refer to the same big category and 'F' if not. Also, provide a brief rationale (no more than 20 words) for your judgment.
Example:
Input: 1. Spiral staircase that goes from a ground floor. 2. This is a 3D model of wooden stairs in light brown
Output: T#Both refer to a staircase.
Input: 1. A white and red van. 2. This is a 3D model of a toy cartoon-style truck
Output: T# Both refer to a car, they are in the same big category.
Now, analyze the following:
Input: 1. {ground_truth} 2. {model_output}
Output: """ # * about 230 input tokens
close_set_cls_prompt = """Given the following free-form description of a 3D object, please determine the most probable class index from the following 40 available categories, even if the description doesn't clearly refer to any one of them. Make your best-educated guess based on the information provided. If the description already contains a valid index, then the index should be selected. If it contains more than one valid index, then randomly select one index (specify your reason). If there is no valid index and it cannot be inferred from the information, return '-1#NA#Cannot infer'.
Categories:
{candidate_lists}
Reply with the format of 'index#class#short reason (no more than 10 words)'.
Examples:
Input: This is a 3D object model of a cartoon white truck.
Output: 7#car#Closest match to 'car' in categories.
Input: A green leaf in a flower pot.
Output: 26#plant#The primary subject 'leaf' directly indicates a plant.
Input: It's difficult to determine the exact type of this object due to insufficient details. But it seems to be like a piece of furniture.
Output: 33#table#Randomly select one kind of furniture from the list.
Input: I cannot determine the specific type of the object without additional information or context.
Output: -1#NA#Cannot infer.
Now analyze the following:
Input: """
object_captioning_prompt = """Evaluate a model-generated caption against a ground-truth caption for a 3D model. Identify the aspects mentioned in theground-truth caption and calculate the percentage of these aspects correctly mentioned or partially matched in the model caption. Score from 0 to 100, where each aspect contributes equally to the score. Consider similar concepts for partial score.
Provide your score (0-100) and a short justification (less than 15 words) in the format of 'score#reason'
Example:
Ground Truth:: A white brown skeleton
Model: This is a 3D model of a small, cartoon-like robot. It has a spherical body and is covered in a layer of white dust.
Output: 50#mention white; skeleton and robot have similar appearence.
Now score the following:
Ground Truth: {ground_truth}
Model: {model_output}
Output: """
LLM_object_captioning_prompt = object_captioning_prompt
LLM_open_free_from_cls_prompt = open_free_from_cls_prompt
LLM_close_set_cls_prompt = close_set_cls_prompt
GPT_PRICES = {
# * check https://openai.com/pricing for updated price
"gpt-3.5-turbo-0125": {
"price_1k_prompt_tokens": 0.0005,
"price_1k_completion_tokens": 0.0015
},
"gpt-3.5-turbo-0613": {
"price_1k_prompt_tokens": 0.0015,
"price_1k_completion_tokens": 0.002
},
"gpt-3.5-turbo-1106": {
"price_1k_prompt_tokens": 0.0010,
"price_1k_completion_tokens": 0.002
},
"gpt-4-0613": {
"price_1k_prompt_tokens": 0.03,
"price_1k_completion_tokens": 0.06
},
"gpt-4-1106-preview": {
"price_1k_prompt_tokens": 0.01,
"price_1k_completion_tokens": 0.03
},
"HF": {
"price_1k_prompt_tokens": 0,
"price_1k_completion_tokens": 0
},
}
class OpenAIOpenFreeFormClsEvaluator():
def __init__(self, inputs, output_dir, output_file, model_type="Qwen/Qwen2-72B-Instruct", client=None):
"""
Args:
inputs: A dictionary containing the results of the evaluation. It contains two keys: "results" and "prompt".
"prompt": str
"results": [
{
"object_id": str,
"model_output": str,
"ground_truth": str
}
]
"""
print("-" * 80)
print("Initializing OpenAIEvaluator...")
self.results = inputs['results'] # * contains two keys: "results" and "prompt"
self.inference_prompt = inputs['prompt'] # * used to prompt PointLLM
self.correct_predictions = 0
self.total_predictions = 0
self.invalid_responses = 0
self.response_data = [] # to save all the response data by openaigpt
self.model_type = model_type
self.check_model_type()
self.client = client
self.prompt_tokens = 0
self.completion_tokens = 0
self.default_chat_parameters = {
"model": model_type,
"temperature": 1,
"top_p": 1,
"max_tokens": 2048
}
# * price
self.price_1k_prompt_tokens = GPT_PRICES["HF"]["price_1k_prompt_tokens"]
self.price_1k_completion_tokens = GPT_PRICES["HF"]["price_1k_completion_tokens"]
print(f"OpenAIGPT config: ")
print(self.default_chat_parameters)
# self.openaigpt = OpenAIGPT(**self.default_chat_parameters)
self.gpt_prompt = LLM_open_free_from_cls_prompt
self.output_dir = output_dir
self.output_file = output_file
self.temp_output_file = self.output_file.replace(".json", "_processed_temp.json")
def get_relpy_from_llm(self, input_sentence):
query_input = input_sentence.replace("the rocket", "this rocket")
query_input = query_input.replace("The rocket", "This rocket")
time.sleep(0.2)
# try:
messages = [{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': query_input}]
response = Generation.call(model=self.model_type,
messages=messages,
# 设置随机数种子seed,如果没有设置,则随机数种子默认为1234
seed=1234,
temperature=0,
top_p=0.8,
top_k=50,
# 将输出设置为"message"格式
result_format='message')
# if response.status_code == HTTPStatus.OK:
# print(response.output.choices[0].message.content)
nested_json_str = response.output.choices[0].message.content
# except (Exception, KeyboardInterrupt) as e:
# print(e)
# print("response:",response)
return nested_json_str
def check_model_type(self):
# # * warning if not using gpt-4, recommend using gpt-4 for this task
# if "gpt-4" not in self.model_type:
# print(f"[WARNING] You are using {self.model_type} for evaluation. We recommend using gpt-4 for this task.")
pass
def resume_processing(self):
processed_results_path = os.path.join(self.output_dir, self.temp_output_file)
if os.path.exists(processed_results_path):
print("-" * 80)
# * print resuming
print(f"Resuming processing...")
print(f"Loading processed results from {processed_results_path}...")
with open(processed_results_path, "r") as f:
saved_results = json.load(f)
self.correct_predictions = saved_results["correct_predictions"]
self.total_predictions = saved_results["total_predictions"]
self.invalid_responses = saved_results["invalid_responses"]
self.response_data = saved_results["results"]
self.prompt_tokens = saved_results["prompt_tokens"]
self.completion_tokens = saved_results["completion_tokens"]
print(f"Processed results: {len(self.response_data)}")
# * print the length of all the data
print(f"Total results: {len(self.results)}")
# * remove processed data
processed_ids = [d['object_id'] for d in self.response_data]
self.results = [r for r in self.results if r['object_id'] not in processed_ids]
print(f"Remaining results: {len(self.results)}")
def remove_temp_file(self):
processed_results_path = os.path.join(self.output_dir, self.temp_output_file)
if os.path.exists(processed_results_path):
os.remove(processed_results_path)
print("-" * 80)
print(f"Removed Temporary file {processed_results_path}")
def parse_gpt_response_evaluate(self, gpt_response):
gpt_response = gpt_response.strip()
cls_result = gpt_response[0].upper()
reason = gpt_response[2:] if len(gpt_response) > 2 else ""
if cls_result not in ['T', 'F']:
self.invalid_responses += 1
return 0, "INVALID", gpt_response
accuracy = 1 if cls_result == 'T' else 0
return accuracy, cls_result, reason
def evaluate_result(self, result):
object_id = result['object_id']
ground_truth = result['ground_truth']
model_output = result['model_output']
messages = self.gpt_prompt.format(ground_truth=ground_truth, model_output=model_output)
gpt_response = self.get_relpy_from_llm(messages)
prompt_tokens = 0
completion_tokens = 0
accuracy, cls_result, reason = self.parse_gpt_response_evaluate(
gpt_response) # return 0, "INVALID", gpt_response if not valid
return object_id, model_output, ground_truth, accuracy, cls_result, reason, prompt_tokens, completion_tokens
def evaluate(self):
self.resume_processing()
print('-' * 80)
print("Starting single-thread evaluation...")
results = self.results
try:
for result in tqdm(results):
object_id, model_output, ground_truth, accuracy, cls_result, reason, prompt_tokens, completion_tokens = self.evaluate_result(
result)
self.correct_predictions += accuracy
self.total_predictions += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
# save the object_id, model_output, ground_truth, gpt_cls_result and gpt_reason for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'model_output': model_output,
'gpt_cls_result': cls_result,
'gpt_reason': reason
})
print("Evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
self.save_results(is_temp=True)
exit()
def parallel_evaluate(self, num_workers=20):
self.resume_processing()
print('-' * 80)
print("Starting parallel evaluation...")
results = self.results
try:
# 使用ThreadPoolExecutor创建线程池
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# 创建进度条
with tqdm(total=len(results)) as pbar:
# 提交任务并收集Future对象
futures = {executor.submit(self.evaluate_result, result): result for result in results}
# 遍历已完成的Future对象
for future in futures:
# 获取Future的结果
object_id, model_output, ground_truth, accuracy, cls_result, reason, prompt_tokens, completion_tokens = future.result()
# 更新统计信息
self.correct_predictions += accuracy
self.total_predictions += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
if cls_result == 'INVALID':
self.invalid_responses += 1
# 保存结果数据
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'model_output': model_output,
'gpt_cls_result': cls_result,
'gpt_reason': reason
})
# 更新进度条
pbar.update()
print("Parallel evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
self.save_results(is_temp=True)
exit()
def save_results(self, is_temp=False):
if is_temp:
output_path = os.path.join(self.output_dir, self.temp_output_file)
else:
output_path = os.path.join(self.output_dir, self.output_file)
if self.total_predictions - self.invalid_responses == 0:
accuracy = 0 # * no results and get error
else:
accuracy = self.correct_predictions / (self.total_predictions - self.invalid_responses) * 100
with open(output_path, 'w') as f:
results_to_save = {
'inference_prompt': self.inference_prompt,
'prompt': self.gpt_prompt,
'accuracy': f"{accuracy:.2f}%",
'total_predictions': self.total_predictions,
'correct_predictions': self.correct_predictions,
'invalid_responses': self.invalid_responses,
'prompt_tokens': self.prompt_tokens,
'completion_tokens': self.completion_tokens,
'GPT_cost': self.get_costs(),
'results': self.response_data,
}
json.dump(results_to_save, f, indent=2)
print(f"Results saved to {output_path}")
# * print the length of saved results
print(f"Saved {len(self.response_data)} results in total.")
def print_results(self):
print('-' * 80)
if self.total_predictions - self.invalid_responses == 0:
accuracy = 0 # * no results and get error
else:
accuracy = self.correct_predictions / (self.total_predictions - self.invalid_responses) * 100
print("Results:")
print(f"Accuracy: {accuracy:.2f}%")
print(f"Total Predictions: {self.total_predictions}")
print(f"Correct Predictions: {self.correct_predictions}")
print(f"Invalid Responses: {self.invalid_responses}")
self.print_costs()
def print_costs(self):
print(f"Prompt Tokens Price: {self.prompt_tokens * self.price_1k_prompt_tokens / 1000:.2f} USD")
print(f"Completion Tokens Price: {self.completion_tokens * self.price_1k_completion_tokens / 1000:.2f} USD")
def get_costs(self):
return self.prompt_tokens * self.price_1k_prompt_tokens / 1000 + self.completion_tokens * self.price_1k_completion_tokens / 1000
class OpenAICloseSetClsEvaluator(OpenAIOpenFreeFormClsEvaluator):
def __init__(self, inputs, output_dir, output_file, model_type="gpt-3.5-turbo-0613", client=None):
super().__init__(inputs, output_dir, output_file, model_type, client=client)
self.gpt_prompt = LLM_close_set_cls_prompt
self.invalid_correct_predictions = 0 # * random choice and correct coincidently
# * import category names
try:
# # * load a txt files of category names
catfile = os.path.join(os.path.dirname(__file__),
'../data/modelnet_config/modelnet40_shape_names_modified.txt') # * i.e. pointllm/data/modelnet_config/modelnet40_shape_names_modified.txt
self.candidate_lists_names = [line.strip() for line in open(catfile)] # * list of category names
except:
print(f"Current categories file is {catfile}. Need to move the category file to pointllm/eval/configs/.")
# * make the prompt
candidate_lists = [f'{i}: {cat}' for i, cat in enumerate(self.candidate_lists_names)]
self.num_categories = len(candidate_lists)
self.candidate_lists = '\n'.join(candidate_lists)
self.gpt_prompt = self.gpt_prompt.format(num_categories=self.num_categories,
candidate_lists=self.candidate_lists) + "{model_output}\nOutput: "
def check_model_type(self):
# * no need to check for this task
return
def resume_processing(self):
processed_results_path = os.path.join(self.output_dir, self.temp_output_file)
if os.path.exists(processed_results_path):
print("-" * 80)
# * print resuming
print(f"Resuming processing...")
print(f"Loading processed results from {processed_results_path}...")
with open(processed_results_path, "r") as f:
saved_results = json.load(f)
self.correct_predictions = saved_results["correct_predictions"]
self.total_predictions = saved_results["total_predictions"]
self.invalid_responses = saved_results["invalid_responses"]
self.invalid_correct_predictions = saved_results["invalid_correct_predictions"]
self.response_data = saved_results["results"]
self.prompt_tokens = saved_results["prompt_tokens"]
self.completion_tokens = saved_results["completion_tokens"]
print(f"Processed results: {len(self.response_data)}")
# * print the length of all the data
print(f"Total results: {len(self.results)}")
# * remove processed data
processed_ids = [d['object_id'] for d in self.response_data]
self.results = [r for r in self.results if r['object_id'] not in processed_ids]
print(f"Remaining results: {len(self.results)}")
def parse_gpt_response_evaluate(self, gpt_response, ground_truth):
"""
Argument:
gpt_response: str, index#label#short_reason
groud_truth: int
"""
# * use regular expression to extract
pattern = r'(\d+#[^#]*#.*$)'
match = re.search(pattern, gpt_response)
gpt_response = match.group(1) if match else gpt_response
gpt_response = gpt_response.strip()
gpt_response_list = gpt_response.split('#')
cls_result = gpt_response_list[0]
cls_label = gpt_response_list[1] if len(gpt_response_list) > 1 else ""
reason = gpt_response_list[2] if len(gpt_response_list) > 2 else ""
try:
# * convert to int
cls_result = int(cls_result)
if cls_result not in range(self.num_categories) or cls_label == "NA":
# * not valid range
cls_result = -1
except ValueError:
print(f"Error: unale to parse {gpt_response}.")
cls_result = -1
if cls_result == -1:
# * random choose one index from 0 to self.num_categories
cls_result = random.choice(range(self.num_categories))
cls_label = "INVALID"
reason = gpt_response
self.invalid_responses += 1
accuracy = 1 if cls_result == ground_truth else 0
return accuracy, cls_result, cls_label, reason
def evaluate_result(self, result):
object_id = result.get('object_id', -1)
ground_truth = result['ground_truth']
ground_truth_label = result['label_name']
model_output = result['model_output']
messages = self.gpt_prompt.format(model_output=model_output)
gpt_response = self.get_relpy_from_llm(messages)
prompt_tokens =0
completion_tokens = 0
gpt_response = gpt_response
accuracy, cls_result, cls_label, reason = self.parse_gpt_response_evaluate(gpt_response,
ground_truth) # return 0, "INVALID", gpt_response if not valid
return object_id, model_output, ground_truth, accuracy, cls_result, cls_label, reason, ground_truth_label, prompt_tokens, completion_tokens
def evaluate(self):
self.resume_processing()
print('-' * 80)
print("Starting single-thread evaluation...")
results = self.results
try:
for result in tqdm(results):
object_id, model_output, ground_truth, accuracy, cls_result, cls_label, reason, ground_truth_label, prompt_tokens, completion_tokens = self.evaluate_result(
result)
self.correct_predictions += accuracy
self.total_predictions += 1
if cls_label == "INVALID":
self.invalid_correct_predictions += accuracy
self.invalid_responses += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
# save the object_id, model_output, ground_truth, gpt_cls_result and gpt_reason for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'gpt_cls_result': cls_result,
'ground_truth_label': ground_truth_label,
'gpt_cls_label': cls_label,
'model_output': model_output,
'gpt_reason': reason,
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens
})
print("Evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
print(f"Current sample is {result}.")
self.save_results(is_temp=True)
exit()
def parallel_evaluate(self, num_workers=20):
self.resume_processing()
print('-' * 80)
print("Starting parallel evaluation...")
results = self.results
try:
# 使用ThreadPoolExecutor创建线程池
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# 创建进度条
with tqdm(total=len(results)) as pbar:
# 提交任务并收集Future对象
futures = {executor.submit(self.evaluate_result, result): result for result in results}
# 遍历已完成的Future对象
for future in futures:
# 获取Future的结果
object_id, model_output, ground_truth, accuracy, cls_result, cls_label, reason, ground_truth_label, prompt_tokens, completion_tokens = future.result()
self.correct_predictions += accuracy
self.total_predictions += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
if cls_label == "INVALID":
self.invalid_correct_predictions += accuracy
self.invalid_responses += 1
# save the object_id, model_output, ground_truth, gpt_cls_result and gpt_reason for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'gpt_cls_result': cls_result,
'ground_truth_label': ground_truth_label,
'gpt_cls_label': cls_label,
'model_output': model_output,
'gpt_reason': reason,
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens
})
pbar.update() # update the progress bar
print("Parallel evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
self.save_results(is_temp=True)
exit()
def save_results(self, is_temp=False):
if is_temp:
output_path = os.path.join(self.output_dir, self.temp_output_file)
else:
output_path = os.path.join(self.output_dir, self.output_file)
if self.total_predictions - self.invalid_responses == 0:
accuracy = 0 # * no results and get error
clean_accuracy = 0
else:
accuracy = self.correct_predictions / self.total_predictions * 100
clean_accuracy = (self.correct_predictions - self.invalid_correct_predictions) / (
self.total_predictions - self.invalid_responses) * 100
with open(output_path, 'w') as f:
results_to_save = {
'inference_prompt': self.inference_prompt,
'prompt': self.gpt_prompt,
'accuracy': f"{accuracy:.2f}%",
'clean_accuracy': f"{clean_accuracy:.2f}%",
'total_predictions': self.total_predictions,
'correct_predictions': self.correct_predictions,
'invalid_correct_predictions': self.invalid_correct_predictions,
'invalid_responses': self.invalid_responses,
'prompt_tokens': self.prompt_tokens,
'completion_tokens': self.completion_tokens,
'GPT_cost': self.get_costs(),
'results': self.response_data,
}
json.dump(results_to_save, f, indent=2)
print(f"Results saved to {output_path}")
# * print the length of saved results
print(f"Saved {len(self.response_data)} results in total.")
def print_results(self):
print('-' * 80)
if self.total_predictions - self.invalid_responses == 0:
accuracy = 0 # * no results and get error
else:
accuracy = self.correct_predictions / self.total_predictions * 100
clean_accuracy = (self.correct_predictions - self.invalid_correct_predictions) / (
self.total_predictions - self.invalid_responses) * 100
accuracy = self.correct_predictions / self.total_predictions * 100
print("Results:")
print(f"Accuracy: {accuracy:.2f}%")
print(f"Clean Accuracy: {clean_accuracy:.2f}%", )
print(f"Total Predictions: {self.total_predictions}")
print(f"Correct Predictions: {self.correct_predictions}")
print(f"Invalid Correct Predictions: {self.invalid_correct_predictions}")
print(f"Invalid Responses: {self.invalid_responses}")
print(f"Prompt Tokens: {self.prompt_tokens}")
print(f"Completion Tokens: {self.completion_tokens}")
self.print_costs()
class OpenAIObjectCaptioningEvaluator(OpenAIOpenFreeFormClsEvaluator):
def __init__(self, inputs, output_dir, output_file, model_type="gpt-4-0613", client=None):
super().__init__(inputs, output_dir, output_file, model_type, client=client)
self.gpt_prompt = LLM_object_captioning_prompt
self.total_scores = 0
def resume_processing(self):
processed_results_path = os.path.join(self.output_dir, self.temp_output_file)
if os.path.exists(processed_results_path):
print("-" * 80)
# * print resuming
print(f"Resuming processing...")
print(f"Loading processed results from {processed_results_path}...")
with open(processed_results_path, "r") as f:
saved_results = json.load(f)
self.total_scores = float(saved_results["total_score"])
self.total_predictions = saved_results["total_predictions"]
self.invalid_responses = saved_results["invalid_responses"]
self.response_data = saved_results["results"]
self.prompt_tokens = saved_results["prompt_tokens"]
self.completion_tokens = saved_results["completion_tokens"]
print(f"Processed results: {len(self.response_data)}")
# * print the length of all the data
print(f"Total results: {len(self.results)}")
# * remove processed data
processed_ids = [d['object_id'] for d in self.response_data]
self.results = [r for r in self.results if r['object_id'] not in processed_ids]
print(f"Remaining results: {len(self.results)}")
def parse_gpt_response_evaluate(self, gpt_response, ground_truth):
"""
Argument:
gpt_response: str, index#label#short_reason
groud_truth: int
"""
# * use regular expression to extract
pattern = r'(\d*#.*)'
match = re.search(pattern, gpt_response)
gpt_response = match.group(1) if match else gpt_response
gpt_response = gpt_response.strip()
gpt_response_list = gpt_response.split('#')
gpt_score = gpt_response_list[0]
reason = gpt_response_list[1] if len(gpt_response_list) > 1 else ""
try:
# * convert to int
gpt_score = int(gpt_score)
if gpt_score not in range(101): # * in 0-100
# * not valid range
gpt_score = -1
except ValueError:
print(f"Error: unale to parse {gpt_response}.")
gpt_score = -1
if gpt_score == -1:
reason = gpt_response
return gpt_score, reason
def evaluate_result(self, result):
object_id = result.get('object_id', -1)
ground_truth = result['ground_truth']
model_output = result['model_output']
messages = self.gpt_prompt.format(ground_truth=ground_truth, model_output=model_output)
gpt_response = self.get_relpy_from_llm(messages)
prompt_tokens = 0
completion_tokens = 0
gpt_response = gpt_response
gpt_score, reason = self.parse_gpt_response_evaluate(gpt_response,
ground_truth) # return 0, "INVALID", gpt_response if not valid
return object_id, model_output, ground_truth, gpt_score, reason, prompt_tokens, completion_tokens
def evaluate(self):
self.resume_processing()
print('-' * 80)
print("Starting single-thread evaluation...")
results = self.results
try:
for result in tqdm(results):
object_id, model_output, ground_truth, gpt_score, reason, prompt_tokens, completion_tokens = self.evaluate_result(
result)
self.total_scores += gpt_score if gpt_score != -1 else 0
self.total_predictions += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
if gpt_score == -1:
self.invalid_responses += 1
# save the object_id, model_output, ground_truth, gpt_cls_result and gpt_reason for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'model_output': model_output,
"gpt_score": gpt_score,
'gpt_reason': reason
})
print("Evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
self.save_results(is_temp=True)
exit()
def parallel_evaluate(self, num_workers=20):
self.resume_processing()
print('-' * 80)
print("Starting parallel evaluation...")
results = self.results
try:
# 使用ThreadPoolExecutor创建线程池
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# 创建进度条
with tqdm(total=len(results)) as pbar:
# 提交任务并收集Future对象
futures = {executor.submit(self.evaluate_result, result): result for result in results}
# 遍历已完成的Future对象
for future in futures:
# 获取Future的结果
object_id, model_output, ground_truth, gpt_score, reason, prompt_tokens, completion_tokens = future.result()
self.total_scores += gpt_score if gpt_score != -1 else 0
self.total_predictions += 1
self.prompt_tokens += prompt_tokens
self.completion_tokens += completion_tokens
if gpt_score == -1:
self.invalid_responses += 1
# save the object_id, model_output, ground_truth, gpt_cls_result and gpt_reason for each result
self.response_data.append({
'object_id': object_id,
'ground_truth': ground_truth,
'model_output': model_output,
"gpt_score": gpt_score,
'gpt_reason': reason
})
pbar.update() # update the progress bar
print("Parallel evaluation finished.")
self.save_results()
self.print_results()
self.remove_temp_file()
except (Exception, KeyboardInterrupt) as e:
print(f"Error {e} occurred during parallel evaluation. Saving processed results to temporary file...")
self.save_results(is_temp=True)
exit()
def save_results(self, is_temp=False):
if is_temp:
output_path = os.path.join(self.output_dir, self.temp_output_file)
else:
output_path = os.path.join(self.output_dir, self.output_file)
if self.total_predictions - self.invalid_responses == 0:
average_score = 0 # * no results and get error
else:
average_score = self.total_scores / (self.total_predictions - self.invalid_responses)
with open(output_path, 'w') as f:
results_to_save = {
'inference_prompt': self.inference_prompt,
'gpt_prompt': self.gpt_prompt,
'average_score': f"{average_score:.2f}",
'total_score': f"{self.total_scores:.2f}",
'total_predictions': self.total_predictions,
'invalid_responses': self.invalid_responses,
'prompt_tokens': self.prompt_tokens,
'completion_tokens': self.completion_tokens,
'GPT_cost': self.get_costs(),
'results': self.response_data,
}
json.dump(results_to_save, f, indent=2)
print(f"Results saved to {output_path}")
# * print the length of saved results
print(f"Saved {len(self.response_data)} results in total.")
def print_results(self):
print('-' * 80)
if self.total_predictions - self.invalid_responses == 0:
average_score = 0 # * no results and get error
else:
average_score = self.total_scores / (self.total_predictions - self.invalid_responses)
print("Results:")
print(f"Average Score: {average_score:.2f}")
print(f"Total Predictions: {self.total_predictions}")
print(f"Invalid Responses: {self.invalid_responses}")
print(f"Prompt Tokens: {self.prompt_tokens}")
print(f"Completion Tokens: {self.completion_tokens}")
self.print_costs()
def convert_model_name_to_spaces_url(model_name: str) -> str:
# 替换斜杠为短横线,并将所有字符转为小写
formatted_name = model_name.replace('/', '-').lower()
# 拼接成完整的URL
spaces_url = f"https://{formatted_name}.hf.space"
return spaces_url
def start_evaluation(results, output_dir, output_file, eval_type="open-free-form-classification",
model_type="gpt-3.5-turbo-0613",
parallel=True, num_workers=20):
"""
Args:
results: dict or file path to the json file containing the dict
output_file: the path the final evaluation results to be saved.
"""
if isinstance(results, str):
with open(results, 'r') as fp:
results = json.load(fp)
# MY_client = Client(convert_model_name_to_spaces_url(model_type))
# MY_client = Client("https://s5k.cn/api/v1/studio/qwen/Qwen2-72B-Instruct-demo/gradio/")
MY_client = None
print("eval_type:",eval_type)
if eval_type == "open-free-form-classification":
evaluator = OpenAIOpenFreeFormClsEvaluator(results, output_dir, output_file, model_type=model_type, client=MY_client)
elif eval_type == "modelnet-close-set-classification":
evaluator = OpenAICloseSetClsEvaluator(results, output_dir, output_file, model_type=model_type, client=MY_client)
elif eval_type == "object-captioning":
evaluator = OpenAIObjectCaptioningEvaluator(results, output_dir, output_file, model_type=model_type, client=MY_client)
else:
raise NotImplementedError(f"eval_type {eval_type} not supported.")
if parallel:
evaluator.parallel_evaluate(num_workers=num_workers)
else:
evaluator.evaluate()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--results_path", type=str, \
default="", help="Path to the results file.")
parser.add_argument("--output_dir", type=str, default=None, help="Path to the output directory.")
parser.add_argument("--model_type", type=str, default="Qwen/Qwen2-72B-Instruct",
help="Type of the model in hugging face used to evaluate.")
parser.add_argument("--parallel", default=True, action="store_true", help="Whether to use parallel evaluation.")
parser.add_argument("--num_workers", type=int, default=15, help="Number of workers to use for parallel evaluation.")
parser.add_argument("--eval_type", type=str,
choices=["modelnet-close-set-classification", "open-free-form-classification",
"object-captioning"], default="object-captioning")
args = parser.parse_args()
if args.output_dir is None:
args.output_dir = os.path.dirname(args.results_path)
output_file = os.path.basename(args.results_path).replace(".json",
f"_evaluated_{(args.model_type).split('/')[-1]}.json")
# if exists, then exit
if os.path.exists(os.path.join(args.output_dir, output_file)):
print(f"[INFO] Evaulated results already exists in {os.path.join(args.output_dir, output_file)}.")
exit()
start_evaluation(results=args.results_path, output_dir=args.output_dir, output_file=output_file,
eval_type=args.eval_type, model_type=args.model_type,
parallel=args.parallel, num_workers=args.num_workers)