|
|
import ast |
|
|
import datetime |
|
|
import json |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
|
|
|
import numpy as np |
|
|
import openai |
|
|
import requests |
|
|
import yaml |
|
|
from decord import VideoReader, cpu |
|
|
from loguru import logger as eval_logger |
|
|
from openai import OpenAI |
|
|
|
|
|
import lmms_eval.tasks._task_utils.file_utils as file_utils |
|
|
|
|
|
with open(Path(__file__).parent / "_default_template_yaml", "r") as f: |
|
|
raw_data = f.readlines() |
|
|
safe_data = [] |
|
|
for i, line in enumerate(raw_data): |
|
|
|
|
|
if "!function" not in line: |
|
|
safe_data.append(line) |
|
|
|
|
|
config = yaml.safe_load("".join(safe_data)) |
|
|
|
|
|
NUM_SECONDS_TO_SLEEP = 5 |
|
|
|
|
|
GPT_EVAL_MODEL_NAME = os.getenv("MODEL_VERSION", "gpt-4o-2024-11-20") |
|
|
|
|
|
API_TYPE = os.getenv("API_TYPE", "openai") |
|
|
|
|
|
if API_TYPE == "openai": |
|
|
API_URL = os.getenv("OPENAI_API_URL", "https://api.openai.com/v1/chat/completions") |
|
|
API_KEY = os.getenv("OPENAI_API_KEY", "YOUR_API_KEY") |
|
|
headers = { |
|
|
"Authorization": f"Bearer {API_KEY}", |
|
|
"Content-Type": "application/json", |
|
|
} |
|
|
|
|
|
|
|
|
HF_HOME = os.environ["HF_HOME"] |
|
|
cache_dir = config["dataset_kwargs"]["cache_dir"] |
|
|
cache_dir = os.path.join(HF_HOME, cache_dir) |
|
|
cache_dir = os.path.join(cache_dir, "all_test") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def activitynetqa_doc_to_visual(doc): |
|
|
video_path = os.path.join(cache_dir, f"v_{doc['video_name']}.mp4") |
|
|
extensions = ["mp4", "webm", "mkv"] |
|
|
for ext in extensions: |
|
|
modified_path = video_path.replace("mp4", ext) |
|
|
if os.path.exists(modified_path): |
|
|
return [modified_path] |
|
|
sys.exit(f"video path:{video_path} does not exist, please check") |
|
|
|
|
|
|
|
|
|
|
|
def activitynetqa_doc_to_text(doc, lmms_eval_specific_kwargs=None): |
|
|
if lmms_eval_specific_kwargs is None: |
|
|
lmms_eval_specific_kwargs = {} |
|
|
pre_prompt = "" |
|
|
post_prompt = "" |
|
|
if "pre_prompt" in lmms_eval_specific_kwargs: |
|
|
pre_prompt = lmms_eval_specific_kwargs["pre_prompt"] |
|
|
if "post_prompt" in lmms_eval_specific_kwargs: |
|
|
post_prompt = lmms_eval_specific_kwargs["post_prompt"] |
|
|
|
|
|
raw_question = doc["question"] |
|
|
question = raw_question.capitalize() + "?" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return f"{pre_prompt}{question}{post_prompt}" |
|
|
|
|
|
|
|
|
def activitynetqa_doc_to_answer(doc): |
|
|
return doc["answer"] |
|
|
|
|
|
|
|
|
def get_eval(question, answer, pred, max_tokens: int, retries: int = 5): |
|
|
global headers |
|
|
|
|
|
messages = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "You are an intelligent chatbot designed for evaluating the correctness of generative outputs for question-answer pairs. " |
|
|
"Your task is to compare the predicted answer with the correct answer and determine if they match meaningfully. Here's how you can accomplish the task:" |
|
|
"------" |
|
|
"##INSTRUCTIONS: " |
|
|
"- Focus on the meaningful match between the predicted answer and the correct answer.\n" |
|
|
"- Consider synonyms or paraphrases as valid matches.\n" |
|
|
"- Evaluate the correctness of the prediction compared to the answer.", |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f"Please evaluate the following video-based question-answer pair:\n\n" |
|
|
f"Question: {question}\n" |
|
|
f"Correct Answer: {answer}\n" |
|
|
f"Predicted Answer: {pred}\n\n" |
|
|
"Provide your evaluation only as a yes/no and score where the score is an integer value between 0 and 5, with 5 indicating the highest meaningful match. " |
|
|
"Please generate the response in the form of a Python dictionary string with keys 'pred' and 'score', where value of 'pred' is a string of 'yes' or 'no' and value of 'score' is in INTEGER, not STRING." |
|
|
"DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. Only provide the Python dictionary string. " |
|
|
"For example, your response should look like this: {'pred': 'yes', 'score': 4.8}.", |
|
|
}, |
|
|
] |
|
|
|
|
|
payload = { |
|
|
"model": GPT_EVAL_MODEL_NAME, |
|
|
"messages": messages, |
|
|
"temperature": 0, |
|
|
"max_tokens": max_tokens, |
|
|
} |
|
|
|
|
|
for attempt in range(retries): |
|
|
try: |
|
|
response = requests.post(API_URL, headers=headers, json=payload, timeout=60) |
|
|
response.raise_for_status() |
|
|
try: |
|
|
response_data = response.json() |
|
|
except requests.exceptions.JSONDecodeError: |
|
|
eval_logger.error(f"JSON decode error on attempt {attempt + 1}. Response text: {response.text}") |
|
|
continue |
|
|
content = response_data["choices"][0]["message"]["content"].strip() |
|
|
if content != "": |
|
|
return content, response_data["model"] |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
eval_logger.error(f"HTTP error on attempt {attempt + 1}: {e}") |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
eval_logger.error(f"Request exception on attempt {attempt + 1}: {e}") |
|
|
except Exception as e: |
|
|
eval_logger.error(f"Unexpected error on attempt {attempt + 1}: {e}") |
|
|
|
|
|
|
|
|
if attempt < retries - 1: |
|
|
time.sleep(NUM_SECONDS_TO_SLEEP) |
|
|
else: |
|
|
eval_logger.error(f"All {retries} attempts failed. Last error message: {e}") |
|
|
return "", "" |
|
|
|
|
|
return "", "" |
|
|
|
|
|
|
|
|
def parse_score(review): |
|
|
try: |
|
|
|
|
|
review = "{" + review.split("{")[1].split("}")[0] + "}" |
|
|
review_dict = ast.literal_eval(review) |
|
|
|
|
|
score_match = review_dict["score"] |
|
|
score = int(score_match) |
|
|
pred = review_dict["pred"] |
|
|
if "yes" in pred.lower(): |
|
|
pred = "yes" |
|
|
elif "no" in pred.lower(): |
|
|
pred = "no" |
|
|
|
|
|
|
|
|
return [pred, score] |
|
|
except SyntaxError as e: |
|
|
eval_logger.error(f"Syntax error parsing the review string: {e}. Review content: {review}") |
|
|
except ValueError as e: |
|
|
eval_logger.error(f"Value error parsing the review string: {e}. Review content: {review}") |
|
|
except Exception as e: |
|
|
eval_logger.error(f"Unexpected error parsing the review string: {e}. Review content: {review}") |
|
|
|
|
|
|
|
|
def activitynetqa_process_results(doc, result): |
|
|
""" |
|
|
Args: |
|
|
doc: a instance of the eval dataset |
|
|
results: [pred] |
|
|
Returns: |
|
|
a dictionary |
|
|
""" |
|
|
try: |
|
|
question = doc["question"] |
|
|
answer = doc["answer"] |
|
|
pred = result[0] |
|
|
|
|
|
|
|
|
review, model_name = get_eval(question, answer, pred, 64) |
|
|
scores = parse_score(review) |
|
|
except Exception as e: |
|
|
eval_logger.error(f"Error for Question ID: {doc.get('question_id', 'Unknown')}: {e}") |
|
|
review = "Failed to Get a Proper Review." |
|
|
model_name = "Failed Request" |
|
|
scores = ["no", 0] |
|
|
|
|
|
return { |
|
|
"gpt_eval_score": {"video_name": doc["video_name"], "question": doc["question"], "answer": doc["answer"], "pred": pred, "question_id": doc["question_id"], "type": doc["type"], "Correctness": scores[0], "score": scores[1]}, |
|
|
"gpt_eval_accuracy": {"video_name": doc["video_name"], "question": doc["question"], "answer": doc["answer"], "pred": pred, "question_id": doc["question_id"], "type": doc["type"], "Correctness": scores[0], "score": scores[1]}, |
|
|
} |
|
|
|
|
|
|
|
|
def activitynetqa_gpt_eval(results, args): |
|
|
""" |
|
|
Process the result file containing predictions, score them using GPT, |
|
|
and save the results with added scores and correctness fields to a new file. |
|
|
|
|
|
Args: |
|
|
result_file_path: path to the JSON file with results to be evaluated |
|
|
eval_file_path: path to save the JSON file with evaluated results |
|
|
""" |
|
|
|
|
|
evaluated_results = [] |
|
|
|
|
|
|
|
|
for data_dict in results: |
|
|
try: |
|
|
question = data_dict.get("Q", "") |
|
|
answer = data_dict.get("A", "") |
|
|
pred = data_dict.get("pred", "") |
|
|
|
|
|
|
|
|
review, model_name = get_eval(question, answer, pred, 64) |
|
|
scores = parse_score(review) |
|
|
except Exception as e: |
|
|
eval_logger.error(f"Error for Question ID: {data_dict.get('question_id', 'Unknown')}: {e}") |
|
|
review = "Failed to Get a Proper Review." |
|
|
model_name = "Failed Request" |
|
|
scores = ["no", 0] |
|
|
|
|
|
|
|
|
updated_dict = {"video_name": data_dict["video_name"], "Correctness": scores[0], "score": scores[1], "Q": question, "A": answer, "pred": pred, "question_id": data_dict.get("question_id"), "type": data_dict.get("type")} |
|
|
evaluated_results.append(updated_dict) |
|
|
|
|
|
return evaluated_results |
|
|
|
|
|
|
|
|
|
|
|
def activitynetqa_aggregate_score(results, args): |
|
|
yes_count = 0 |
|
|
no_count = 0 |
|
|
total_score = 0 |
|
|
|
|
|
|
|
|
for result_dict in results: |
|
|
if "yes" in result_dict["Correctness"].lower(): |
|
|
yes_count += 1 |
|
|
elif "no" in result_dict["Correctness"].lower(): |
|
|
no_count += 1 |
|
|
|
|
|
total_score += int(result_dict["score"]) |
|
|
|
|
|
|
|
|
accuracy = yes_count / (yes_count + no_count) if (yes_count + no_count) > 0 else 0 |
|
|
average_score = total_score / len(results) if results else 0 |
|
|
eval_logger.info(f"Accuracy: {accuracy}") |
|
|
eval_logger.info(f"Average Score: {average_score}") |
|
|
return average_score |
|
|
|
|
|
|
|
|
def activitynetqa_aggregate_accuracy(results, args): |
|
|
yes_count = 0 |
|
|
no_count = 0 |
|
|
total_score = 0 |
|
|
|
|
|
|
|
|
for result_dict in results: |
|
|
if "yes" in result_dict["Correctness"].lower(): |
|
|
yes_count += 1 |
|
|
elif "no" in result_dict["Correctness"].lower(): |
|
|
no_count += 1 |
|
|
|
|
|
total_score += int(result_dict["score"]) |
|
|
|
|
|
|
|
|
accuracy = yes_count / (yes_count + no_count) if (yes_count + no_count) > 0 else 0 |
|
|
average_score = total_score / len(results) if results else 0 |
|
|
eval_logger.info(f"Accuracy: {accuracy}") |
|
|
eval_logger.info(f"Average Score: {average_score}") |
|
|
return accuracy * 100 |
|
|
|