|
import argparse |
|
import numpy as np |
|
from nltk.translate.bleu_score import sentence_bleu |
|
import sys |
|
sys.path.append('/home/ataallka/minigpt_video/minigpt_multi_img') |
|
from minigpt4.common.registry import registry |
|
from minigpt4.common.config import Config |
|
|
|
|
|
from minigpt4.datasets.builders import * |
|
from minigpt4.models import * |
|
from minigpt4.processors import * |
|
|
|
from minigpt4.tasks import * |
|
from pycocoevalcap.cider.cider import Cider |
|
import os |
|
import openai |
|
from tqdm import tqdm |
|
import json |
|
import ast |
|
import time |
|
|
|
def eval_parser(): |
|
parser = argparse.ArgumentParser(description="Demo") |
|
parser.add_argument("--cfg-path", help="path to configuration file.",default="test_configs/llama2_test_config.yaml") |
|
parser.add_argument("--ckpt", type=str,default='checkpoints/video_llama_checkpoint_last.pth', help="path to checkpoint") |
|
parser.add_argument("--eval_opt", type=str, default='all', help="path to configuration file.") |
|
parser.add_argument("--max_new_tokens", type=int, default=512, help="max number of generated tokens") |
|
parser.add_argument("--lora_r", type=int, default=64, help="lora rank of the model") |
|
parser.add_argument("--lora_alpha", type=int, default=16, help="lora alpha") |
|
parser.add_argument( |
|
"--options", |
|
nargs="+", |
|
help="override some settings in the used config, the key-value pair " |
|
"in xxx=yyy format will be merged into config file (deprecate), " |
|
"change to --cfg-options instead.", |
|
) |
|
return parser |
|
|
|
|
|
def prepare_texts(texts, conv_temp, template='<Img><ImageHere></Img>', lengths=None): |
|
convs = [conv_temp.copy() for _ in range(len(texts))] |
|
if lengths is None: |
|
[conv.append_message(conv.roles[0], '{} {}'.format(template, text)) for conv, text in zip(convs, texts)] |
|
else: |
|
templates = [template * length for length in lengths] |
|
[conv.append_message(conv.roles[0], '{} {}'.format(template, text)) for template, conv, text in zip(templates, convs, texts)] |
|
[conv.append_message(conv.roles[1], None) for conv in convs] |
|
texts = [conv.get_prompt() for conv in convs] |
|
return texts |
|
|
|
|
|
def init_model(args): |
|
print('Initialization Model') |
|
cfg = Config(args) |
|
cfg.model_cfg.ckpt = args.ckpt |
|
cfg.model_cfg.lora_r = args.lora_r |
|
cfg.model_cfg.lora_alpha = args.lora_alpha |
|
|
|
model_config = cfg.model_cfg |
|
model_config.low_resource = True |
|
model_cls = registry.get_model_class(model_config.arch) |
|
model = model_cls.from_config(model_config).to('cuda:0') |
|
|
|
|
|
key = list(cfg.datasets_cfg.keys())[0] |
|
vis_processor_cfg = cfg.datasets_cfg.get(key).vis_processor.train |
|
print(vis_processor_cfg) |
|
vis_processor = registry.get_processor_class(vis_processor_cfg.name).from_config(vis_processor_cfg) |
|
print('Initialization Finished') |
|
return model, vis_processor |
|
|
|
def computeIoU(bbox1, bbox2): |
|
x1, y1, x2, y2 = bbox1 |
|
x3, y3, x4, y4 = bbox2 |
|
intersection_x1 = max(x1, x3) |
|
intersection_y1 = max(y1, y3) |
|
intersection_x2 = min(x2, x4) |
|
intersection_y2 = min(y2, y4) |
|
intersection_area = max(0, intersection_x2 - intersection_x1 + 1) * max(0, intersection_y2 - intersection_y1 + 1) |
|
bbox1_area = (x2 - x1 + 1) * (y2 - y1 + 1) |
|
bbox2_area = (x4 - x3 + 1) * (y4 - y3 + 1) |
|
union_area = bbox1_area + bbox2_area - intersection_area |
|
iou = intersection_area / union_area |
|
return iou |
|
|
|
def eval_bleu(results): |
|
bleus1,bleus2,bleus3,bleus4 = [],[],[],[] |
|
for result in tqdm (results,desc="bleu_eval"): |
|
gt = result['gt'] |
|
pred = result['pred'] |
|
bleus1.append(sentence_bleu([gt.split()], pred.split(), weights=(1,0,0,0))) |
|
bleus2.append(sentence_bleu([gt.split()], pred.split(), weights=(0.5,0.5,0,0))) |
|
bleus3.append(sentence_bleu([gt.split()], pred.split(), weights=(0.33,0.33,0.33,0))) |
|
bleus4.append(sentence_bleu([gt.split()], pred.split())) |
|
|
|
return {'bleu1':np.mean(bleus1),'bleu2':np.mean(bleus2),'bleu3':np.mean(bleus3),'bleu4':np.mean(bleus4)} |
|
|
|
|
|
cider_scorer = Cider() |
|
def eval_cider(pred_result,gt_result): |
|
|
|
mean_cider_scores, cider_scores = cider_scorer.compute_score(gt_result, pred_result) |
|
cider_scores_dict={} |
|
for score,pred_vid_id,gt_vid_id in tqdm(zip(cider_scores.tolist(),pred_result,gt_result),desc="cider_eval") : |
|
assert pred_vid_id==gt_vid_id |
|
cider_scores_dict[pred_vid_id] = score |
|
return {'mean_cider_scores':mean_cider_scores,'cider_scores':cider_scores_dict} |
|
|
|
|
|
openai.api_key_path = "/home/ataallka/chatgpt_api.txt" |
|
|
|
|
|
def chat_gpt_eval(results,output_path): |
|
trial=0 |
|
gpt_results=[] |
|
avg_chatgpt_score=0 |
|
existed_files={} |
|
|
|
for file in os.listdir(output_path): |
|
if file.endswith(".json"): |
|
with open(f'{output_path}/{file}') as json_file: |
|
data = json.load(json_file) |
|
gpt_results.append(data[0]) |
|
avg_chatgpt_score+=float(data[0]['chatgpt_score']) |
|
existed_files[data[0]['video_name']]=True |
|
length_output_path=len(os.listdir(output_path)) |
|
while len (results)!= length_output_path: |
|
for res in tqdm(results,desc="chatgpt_eval"): |
|
if existed_files.get(res['video_name'],False): |
|
continue |
|
video_name=res['video_name'] |
|
sentence_1=res['A'] |
|
sentence_2=res['pred'] |
|
try: |
|
|
|
prompt=f"given these 2 sentences the first one is the ground truth descrption of a video and the second sentence is the generated text from a video summarization model,give it a score from 0 to 5 to evaluate the model summarization performance.the output should be only the score number without any additional information\nfirst sentence: {sentence_1}\nsecond sentence: {sentence_2}\nscore:" |
|
response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": prompt |
|
}], |
|
) |
|
res['chatgpt_score']=response.choices[0].message['content'] |
|
out={'video_name':video_name,'chatgpt_score':response.choices[0].message['content']} |
|
gpt_results.append(out) |
|
|
|
with open(f'{output_path}/{video_name}.json', 'w') as f: |
|
json.dump([out], f) |
|
avg_chatgpt_score+=float(response.choices[0].message['content']) |
|
except Exception as e: |
|
print("chat gpt error",e) |
|
print ("Finished chat gpt evaluation in trial",trial) |
|
trial+=1 |
|
length_output_path=len(os.listdir(output_path)) |
|
return results,avg_chatgpt_score/len(results) |
|
def GPT4_answer(question, answer,pred): |
|
try: |
|
|
|
completion = openai.ChatCompletion.create( |
|
|
|
model='gpt-4', |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": |
|
"You are an intelligent chatbot designed for evaluating the correctness of generative outputs for question-answer pairs. " |
|
"Your task is to compare the predicted answer with the correct answer and determine if they match meaningfully. Here's how you can accomplish the task:" |
|
"------" |
|
"##INSTRUCTIONS: " |
|
"- Focus on the meaningful match between the predicted answer and the correct answer.\n" |
|
"- Consider synonyms or paraphrases as valid matches.\n" |
|
"- Evaluate the correctness of the prediction compared to the answer." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": |
|
"Please evaluate the following video-based question-answer pair:\n\n" |
|
f"Question: {question}\n" |
|
f"Correct Answer: {answer}\n" |
|
f"Predicted Answer: {pred}\n\n" |
|
"Provide your evaluation only as a yes/no and score where the score is an integer value between 0 and 5, with 5 indicating the highest meaningful match. " |
|
"Please generate the response in the form of a Python dictionary string with keys 'pred' and 'score', where value of 'pred' is a string of 'yes' or 'no' and value of 'score' is in INTEGER, not STRING." |
|
"DO NOT PROVIDE ANY OTHER OUTPUT TEXT OR EXPLANATION. Only provide the Python dictionary string. " |
|
"For example, your response should look like this: {'pred': 'yes', 'score': 4.8}." |
|
} |
|
] |
|
) |
|
|
|
response_message = completion["choices"][0]["message"]["content"] |
|
response_dict = ast.literal_eval(response_message) |
|
return response_dict |
|
except Exception as e: |
|
print(f"Error : {e}") |
|
return None |
|
def GPT4_evaluation(val_result): |
|
scores=[] |
|
yes_count=0 |
|
no_count=0 |
|
for res in val_result: |
|
gpt_response=GPT4_answer(res['Q'],res['A'],res['pred']) |
|
if gpt_response is None: |
|
continue |
|
try: |
|
scores.append(float(gpt_response['score'])) |
|
if 'yes' in gpt_response['pred'].lower(): |
|
yes_count+=1 |
|
elif 'no' in gpt_response['pred'].lower(): |
|
no_count+=1 |
|
except: |
|
continue |
|
avg_score=sum(scores)/len(scores) |
|
accuracy=(yes_count/(yes_count+no_count))*100 |
|
print(f"chatgpt score: {avg_score} accuracy: {accuracy}") |
|
return avg_score,accuracy |
|
|
|
|
|
|
|
|
|
|
|
|
|
|