csuhan's picture
Upload folder using huggingface_hub
b0c0df0 verified
import ast
import datetime
import json
import os
import sys
import time
from pathlib import Path
import numpy as np
import openai
import requests
import yaml
from decord import VideoReader, cpu
from loguru import logger as eval_logger
from openai import OpenAI
import lmms_eval.tasks._task_utils.file_utils as file_utils
with open(Path(__file__).parent / "_default_template_yaml", "r") as f:
raw_data = f.readlines()
safe_data = []
for i, line in enumerate(raw_data):
# remove function definition since yaml load cannot handle it
if "!function" not in line:
safe_data.append(line)
config = yaml.safe_load("".join(safe_data))
NUM_SECONDS_TO_SLEEP = 5
GPT_EVAL_MODEL_NAME = os.getenv("MODEL_VERSION", "gpt-4o-2024-11-20")
API_TYPE = os.getenv("API_TYPE", "openai")
if API_TYPE == "openai":
API_URL = os.getenv("OPENAI_API_URL", "https://api.openai.com/v1/chat/completions")
API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_API_KEY")
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
# Unzip all the zip files to HF HOME cache dir
HF_HOME = os.environ["HF_HOME"]
cache_dir = config["dataset_kwargs"]["cache_dir"]
cache_dir = os.path.join(HF_HOME, cache_dir)
cache_dir = os.path.join(cache_dir, "all_test")
# Pass in video path here
# Can only work correctly with video dataset
def activitynetqa_doc_to_visual(doc):
video_path = os.path.join(cache_dir, f"v_{doc['video_name']}.mp4")
extensions = ["mp4", "webm", "mkv"]
for ext in extensions:
modified_path = video_path.replace("mp4", ext)
if os.path.exists(modified_path):
return [modified_path]
sys.exit(f"video path:{video_path} does not exist, please check")
# This is the place where format the question
def activitynetqa_doc_to_text(doc, lmms_eval_specific_kwargs=None):
if lmms_eval_specific_kwargs is None:
lmms_eval_specific_kwargs = {}
pre_prompt = ""
post_prompt = ""
if "pre_prompt" in lmms_eval_specific_kwargs:
pre_prompt = lmms_eval_specific_kwargs["pre_prompt"]
if "post_prompt" in lmms_eval_specific_kwargs:
post_prompt = lmms_eval_specific_kwargs["post_prompt"]
raw_question = doc["question"]
question = raw_question.capitalize() + "?"
# type_specific_prompts = {
# '3': "Please answer with 'yes' or 'no'.",
# '4': "Please state the color as a single word.",
# '7': "Please give the numerical answer."
# }
# doc_type = str(doc['type'])
# type_specific_prompt = type_specific_prompts.get(doc_type, "")
# return f"{pre_prompt}{question} {type_specific_prompt}{post_prompt}"
return f"{pre_prompt}{question}{post_prompt}"
def activitynetqa_doc_to_answer(doc):
return doc["answer"]
def get_eval(question, answer, pred, max_tokens: int, retries: int = 5):
global headers
messages = [
{
"role": "system",
"content": "You are an intelligent chatbot designed for evaluating the correctness of generative outputs for question-answer pairs. "
"Your task is to compare the predicted answer with the correct answer and determine if they match meaningfully. Here's how you can accomplish the task:"
"------"
"##INSTRUCTIONS: "
"- Focus on the meaningful match between the predicted answer and the correct answer.\n"
"- Consider synonyms or paraphrases as valid matches.\n"
"- Evaluate the correctness of the prediction compared to the answer.",
},
{
"role": "user",
"content": f"Please evaluate the following video-based question-answer pair:\n\n"
f"Question: {question}\n"
f"Correct Answer: {answer}\n"
f"Predicted Answer: {pred}\n\n"
"Provide your evaluation only as a yes/no and score where the score is an integer value between 0 and 5, with 5 indicating the highest meaningful match. "
"Please generate the response in the form of a Python dictionary string with keys 'pred' and 'score', where value of 'pred' is a string of 'yes' or 'no' and value of 'score' is in INTEGER, not STRING."
"DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. Only provide the Python dictionary string. "
"For example, your response should look like this: {'pred': 'yes', 'score': 4.8}.",
},
]
payload = {
"model": GPT_EVAL_MODEL_NAME,
"messages": messages,
"temperature": 0,
"max_tokens": max_tokens,
}
for attempt in range(retries):
try:
response = requests.post(API_URL, headers=headers, json=payload, timeout=60)
response.raise_for_status() # Raises HTTPError for bad responses
try:
response_data = response.json() # Attempt to parse JSON
except requests.exceptions.JSONDecodeError:
eval_logger.error(f"JSON decode error on attempt {attempt + 1}. Response text: {response.text}")
continue # Skip to next retry
content = response_data["choices"][0]["message"]["content"].strip()
if content != "":
return content, response_data["model"]
# Handle HTTP errors separately
except requests.exceptions.HTTPError as e:
eval_logger.error(f"HTTP error on attempt {attempt + 1}: {e}")
# Handle other requests-related errors
except requests.exceptions.RequestException as e:
eval_logger.error(f"Request exception on attempt {attempt + 1}: {e}")
except Exception as e:
eval_logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
# Handle other unexpected errors
if attempt < retries - 1:
time.sleep(NUM_SECONDS_TO_SLEEP)
else: # If this was the last attempt, log and return empty
eval_logger.error(f"All {retries} attempts failed. Last error message: {e}")
return "", ""
return "", ""
def parse_score(review):
try:
# Convert the string representation of a dictionary to an actual dictionary
review = "{" + review.split("{")[1].split("}")[0] + "}"
review_dict = ast.literal_eval(review)
# import pdb;pdb.set_trace()
score_match = review_dict["score"]
score = int(score_match)
pred = review_dict["pred"]
if "yes" in pred.lower():
pred = "yes"
elif "no" in pred.lower():
pred = "no"
# pred = review_dict.get("pred", "no")
# score = review_dict.get("score", 0)
return [pred, score]
except SyntaxError as e:
eval_logger.error(f"Syntax error parsing the review string: {e}. Review content: {review}")
except ValueError as e:
eval_logger.error(f"Value error parsing the review string: {e}. Review content: {review}")
except Exception as e:
eval_logger.error(f"Unexpected error parsing the review string: {e}. Review content: {review}")
def activitynetqa_process_results(doc, result):
"""
Args:
doc: a instance of the eval dataset
results: [pred]
Returns:
a dictionary
"""
try:
question = doc["question"]
answer = doc["answer"]
pred = result[0]
# Assume get_eval returns a review and the model name, and parse_score parses this review
review, model_name = get_eval(question, answer, pred, 64)
scores = parse_score(review)
except Exception as e:
eval_logger.error(f"Error for Question ID: {doc.get('question_id', 'Unknown')}: {e}")
review = "Failed to Get a Proper Review."
model_name = "Failed Request"
scores = ["no", 0]
return {
"gpt_eval_score": {"video_name": doc["video_name"], "question": doc["question"], "answer": doc["answer"], "pred": pred, "question_id": doc["question_id"], "type": doc["type"], "Correctness": scores[0], "score": scores[1]},
"gpt_eval_accuracy": {"video_name": doc["video_name"], "question": doc["question"], "answer": doc["answer"], "pred": pred, "question_id": doc["question_id"], "type": doc["type"], "Correctness": scores[0], "score": scores[1]},
}
def activitynetqa_gpt_eval(results, args):
"""
Process the result file containing predictions, score them using GPT,
and save the results with added scores and correctness fields to a new file.
Args:
result_file_path: path to the JSON file with results to be evaluated
eval_file_path: path to save the JSON file with evaluated results
"""
evaluated_results = []
# Process each result to generate scores
for data_dict in results:
try:
question = data_dict.get("Q", "")
answer = data_dict.get("A", "")
pred = data_dict.get("pred", "")
# Assume get_eval returns a review and the model name, and parse_score parses this review
review, model_name = get_eval(question, answer, pred, 64)
scores = parse_score(review)
except Exception as e:
eval_logger.error(f"Error for Question ID: {data_dict.get('question_id', 'Unknown')}: {e}")
review = "Failed to Get a Proper Review."
model_name = "Failed Request"
scores = ["no", 0]
# Update the dictionary with the new entries
updated_dict = {"video_name": data_dict["video_name"], "Correctness": scores[0], "score": scores[1], "Q": question, "A": answer, "pred": pred, "question_id": data_dict.get("question_id"), "type": data_dict.get("type")}
evaluated_results.append(updated_dict)
return evaluated_results
# Factory into different aggregate
def activitynetqa_aggregate_score(results, args):
yes_count = 0
no_count = 0
total_score = 0
# Iterate over the results to count correctness and sum scores
for result_dict in results:
if "yes" in result_dict["Correctness"].lower():
yes_count += 1
elif "no" in result_dict["Correctness"].lower():
no_count += 1
total_score += int(result_dict["score"])
# Calculate accuracy and average score
accuracy = yes_count / (yes_count + no_count) if (yes_count + no_count) > 0 else 0
average_score = total_score / len(results) if results else 0
eval_logger.info(f"Accuracy: {accuracy}")
eval_logger.info(f"Average Score: {average_score}")
return average_score
def activitynetqa_aggregate_accuracy(results, args):
yes_count = 0
no_count = 0
total_score = 0
# Iterate over the results to count correctness and sum scores
for result_dict in results:
if "yes" in result_dict["Correctness"].lower():
yes_count += 1
elif "no" in result_dict["Correctness"].lower():
no_count += 1
total_score += int(result_dict["score"])
# Calculate accuracy and average score
accuracy = yes_count / (yes_count + no_count) if (yes_count + no_count) > 0 else 0
average_score = total_score / len(results) if results else 0
eval_logger.info(f"Accuracy: {accuracy}")
eval_logger.info(f"Average Score: {average_score}")
return accuracy * 100