# Evaluation code for GQA. # Computes a suite of metrics such as accuracy, consistency, plausibility and scores per question type and length. # Visit https://gqadataset.org/ for all information about the dataset, including examples, visualizations, paper and slides. # # # Metrics: # - Accuracy: Standard accuracy, computed over the balanced version of the dataset, which is more robust against # cheating by making educated guesses. For each question-answer pair (q,a), we give 1 point if the # predicted answer p matches a and 0 otherwise, and average over all questions in the dataset. # # - Consistency: A metric for the level of model's consistency across different questions. For each question-answer # pair (q,a), we define a set Eq={q1, q2, ..., qn} of entailed questions, the answers to which can # be unambiguously inferred given (q,a). # Denote Q the set of all questions the model answered correctly. For each question q in Q, we # measure the model's accuracy over the entailed questions Eq to get the score sq and finally # average these results across all questions in Q. # # - Validity: Measures whether the model gives a "valid" answer - one that can theoretically be an answer # to the question (e.g. a color to a color question, yes/no to a binary question etc.). # We provide a set of valid answers to each questions over the final answer vocabulary, in # the choices file, and use it to compute average validity across the dataset. # # - Plausibility: Measures whether the model answers are plausible, e.g. one that make sense in the real world, # e.g. not answering "purple" to a question about apple color (unless it's really purple). # We provide a set of all plausible answers to each questions, computed by looking at all # attributes and relations hold for various objects throughout the whole dataset scene graphs, # and use it to compute average model plausibility across the data. # # - Grounding: Only for attention models. Measures whether the model looks at the relevant regions in the # image when answering a question. Each question in the dataset is annotated with the visual regions # they refer to, which are then used to compute the level to which the model has a correct visual attention, # which will allow to identify whether it really answers based on the image of by language-based guesses. # Supports both spatial features and object-based features. # # - Distribution: Measures the overall match between the true answer distribution for different questions, # vs the overall distribution predicted by the model through its answers for all the data. # We use chi-square statistic to measure the degree of similarity between the distributions, # giving indication to the level of overall world-knowledge of the model # # - Accuracy per type: accuracy per question structural types (logic, compare, choose), and semantic type # (questions about attributes, relations, categories, objects or the whole scene). # # - Accuracy for length: accuracy as a function of the question length, in terms of (1) words number, and semantic # complexity - number of reasoning steps. # # We may support additional metrics (e.g. coverage) in the future. # # # Files format: # - predictions file format: JSON array: [{"questionId": str, "prediction": str}] # - attentions file format: JSON array: # Spatial attention: [{"questionId": str, "attention": [mapSize x mapSize: float] }]. # Object-based attention:[{"questionId": str, "attention": [[x0, y0, x1, y1, float] x #regions] }]. 0 < x,y < 1. # - questions and choices files are provided as part of the dataset. # see https://gqadataset.org/download.html for information about their format. # # # If you have any questions or comments, please feel free to send an email, # at dorarad@cs.stanford.edu. We hope you'll enjoy using the GQA dataset! :) # # # import torch.nn as nn from collections import defaultdict from tqdm import tqdm import os.path import glob import json from mmengine.logging import print_log ##### Arguments ########################################################################################## class eval_gqa(): def __init__( self, tier="val", scenes="{tier}_sceneGraphs.json", questions="{tier}_all_questions.json", choices="{tier}_choices.json", predictions="{tier}_predictions.json", attentions="{tier}_attentions.json", consistency=False, grounding=False, objectFeatures=False, mapSize=7, ): self.consistency= consistency self.grounding = grounding self.objectFeatures = objectFeatures self.mapSize = mapSize if not consistency: print_log("Please consider using --consistency to compute consistency scores for entailed questions.") print_log("If you do so, please provide answers to all questions in val_all_questions.json.\n") if not grounding: print_log("Please consider using --grounding to compute attention scores.") print_log("If you do so, please provide attention maps through --attentions.\n") ##### Files Loading ########################################################################################## # # Load scene graphs # print_log("Loading scene graphs...") # try: # self.scenes = self.loadFile(scenes.format(tier=self.tier)) # except: # print_log('Failed to load scene graphs -- cannot evaluate grounding') # self.scenes = None # for testdev # Load questions print_log("Loading questions...") print(questions) self.questions = self.loadFile(questions) # # Load choices # print_log("Loading choices...") # try: # self.choices = self.loadFile(choices.format(tier=self.tier)) # except: # print_log('Failed to load choices -- cannot evaluate validity or plausibility') # self.choices = None # for testdev # Load predictions and turn them into a dictionary print_log("Loading predictions...") predictions = self.loadFile(predictions.format(tier=tier)) self.predictions = {p["questionId"]: p["prediction"] for p in predictions} # Make sure all question have predictions for qid in self.questions: if (qid not in self.predictions) and (consistency or self.questions[qid]["isBalanced"]): print_log("no prediction for question {}. Please add prediction for all questions.".format(qid)) raise Exception("missing predictions") # Load attentions and turn them into a dictionary self.attentions = None if grounding: with open(attentions.format(tier=tier)) as attentionsFile: attentions = json.load(attentionsFile) self.attentions = {a["questionId"]: a["attention"] for a in attentions} def forward(self): # Initialize data structure to track all metrics: e.g. accuracy, validity and plausibility, as well as # accuracy per question type, length and number of reasoning steps. scores = { "accuracy": [], # list of accuracies per question (1 if correct else 0). Will be averaged ultimately. "binary": [], # list of accuracies per a binary question (1 if correct else 0). Will be averaged ultimately. "open": [], # list of accuracies per an open question (1 if correct else 0). Will be averaged ultimately. "validity": [], # list of validity per question (1 if valid else 0). "plausibility": [], # list of plausibility per question (1 if plausible else 0). "consistency": [], # list of consistency scores for entailed questions. "accuracyPerStructuralType": defaultdict(list), # list of question accuracies for each structural type (e.g. compare, logic questions). "accuracyPerSemanticType": defaultdict(list), # list of question accuracies for each semantic type (e.g. questions about an object, an attribute, a relation). "accuracyPerLength": defaultdict(list), # list of question accuracies per question's word number. "accuracyPerSteps": defaultdict(list), # list of question accuracies per question's reasoning length (steps number). "grounding": [], # list of grounding scores for each question. } # Initialize golden and predicted histograms per each question group. Used to compute the distribution metric. dist = {"gold": defaultdict(lambda: defaultdict(int)), "predicted": defaultdict(lambda: defaultdict(int))} ##### Main score computation ########################################################################################## # Loop over the questions and compute mterics for qid, question in tqdm(self.questions.items()): # Compute scores over the balanced dataset (more robust against cheating by making educated guesses) if question["isBalanced"]: gold = question["answer"] predicted = self.predictions[qid] correct = predicted == gold score = self.toScore(correct) wordsNum = self.getWordsNum(question) stepsNum = self.getStepsNum(question) # Update accuracy scores["accuracy"].append(score) scores["accuracyPerLength"][wordsNum].append(score) scores["accuracyPerSteps"][stepsNum].append(score) scores["accuracyPerStructuralType"][question["types"]["structural"]].append(score) scores["accuracyPerSemanticType"][question["types"]["semantic"]].append(score) answerType = "open" if question["types"]["structural"] == "query" else "binary" scores[answerType].append(score) # # Update validity score # valid = ( # self.belongs(predicted, self.choices[qid]["valid"], question) if self.choices else False # ) # scores["validity"].append(self.toScore(valid)) # # Update plausibility score # plausible = ( # self.belongs(predicted, self.choices[qid]["plausible"], question) # if self.choices # else False # ) # scores["plausibility"].append(self.toScore(plausible)) # # Optionally compute grounding (attention) score # if self.attentions is not None: # groundingScore = self.computeGroundingScore( # question, self.scenes[question["imageId"]], self.attentions[qid] # ) # if groundingScore is not None: # scores["grounding"].append(groundingScore) # Update histograms for gold and predicted answers globalGroup = question["groups"]["global"] if globalGroup is not None: dist["gold"][globalGroup][gold] += 1 dist["predicted"][globalGroup][predicted] += 1 # if self.consistency: # # Compute consistency (for entailed questions) # scores = self.updateConsistency(qid, question, self.questions, correct, scores) # Compute distribution score scores["distribution"] = self.chiSquare(dist["gold"], dist["predicted"]) / 100 # Average scores over all questions (in the balanced dataset) and print_log scores metrics = [ "binary", "open", "accuracy", "consistency", "validity", "plausibility", "grounding", "distribution", ] detailedMetrics = [ ("accuracyPerStructuralType", "Accuracy / structural type"), ("accuracyPerSemanticType", "Accuracy / semantic type"), ("accuracyPerSteps", "Accuracy / steps number"), ("accuracyPerLength", "Accuracy / words number"), ] subMetrics = {"attr": "attribute", "cat": "category", "global": "scene", "obj": "object", "rel": "relation"} # average for k in metrics: if isinstance(scores[k], list): scores[k] = self.avg(scores[k]) * 100 for k, _ in detailedMetrics: for t in scores[k]: scores[k][t] = self.avg(scores[k][t]) * 100, len(scores[k][t]) # print_log print_log("") for m in metrics: # skip grounding and consistency scores if not requested if m == "grounding" and not self.grounding: continue if m == "consistency" and not self.consistency: continue # print_log score print_log( "{title}: {score:.2f}{suffix}".format( title=m.capitalize(), score=scores[m], suffix=" (lower is better)" if m == "distribution" else "%", ), 'current', ) for m, mPrintName in detailedMetrics: print_log("") # print_log metric title print_log("{}:".format(mPrintName), 'current') for t in sorted(list(scores[m].keys())): # set sub-metric title tName = t if isinstance(scores[k], list): tName = subMetrics.get(t, t).capitalize() # print_log score print_log( " {title}: {score:.2f}{suffix} ({amount} questions)".format( title=tName, score=scores[m][t][0], suffix="%", amount=scores[m][t][1] ), 'current', ) def loadFile(self, name): # load standard json file if os.path.isfile(name): with open(name) as file: data = json.load(file) # load file chunks if too big elif os.path.isdir(name.split(".")[0]): data = {} chunks = glob.glob('{dir}/{dir}_*.{ext}'.format(dir=name.split(".")[0], ext=name.split(".")[1])) for chunk in chunks: with open(chunk) as file: data.update(json.load(file)) else: raise Exception("Can't find {}".format(name)) return data ##### Scores data structures initialization ########################################################################################## # book to float def toScore(self, b): return float(1 if b else 0) # Compute average of a list def avg(self, l): if len(l) == 0: return 0 return float(sum(l)) / len(l) def wavg(self, l, w): if sum(w) == 0: return None return float(sum(l[i] * w[i] for i in range(len(l)))) / sum(w) ##### Question lengths - words numbers and reasoning steps number ########################################################################################## # Compute question length (words number) def getWordsNum(self, question): return len(question["question"].split()) # Compute number of reasoning steps (excluding the final "querying" step which doesn't increase effective reasoning length) def getStepsNum(self, question): return len( [ c for c in question["semantic"] if not ( any( [ o in "{}: {}".format(c["operation"], c["argument"]) for o in ["exist", "query: name", "choose name"] ] ) ) ] ) ##### Functions for question annotations ########################################################################################## # # Utility function for converting question annotations string keys to slices # def toSlice(strSlice): # sliceLims = (int(n) for n in strSlice.split(':')) # return apply(slice, sliceLims) # # Utility function for converting question annotations string keys to indexes list: # # "1" => [0] # # "1:3" => [1, 2] # # "4:9:2" => [4, 6, 8] # def intsFromSlice(strSlice): # slice_obj = get_slice_obj(slicearg) # return range(slice_obj.start or 0, slice_obj.stop or -1, slice_obj.step or 1) ##### Functions for validity and plausibility ########################################################################################## def belongs(self, element, group, question): # normalization () if "Common" in question["types"]["detailed"]: group = ["color", "material", "shape"] return element in group ##### Functions for consistency scores (for entailed questions ("inferred")) ########################################################################################## def updateConsistency(self, questionId, question, questions, correct, scores): inferredQuestions = [eid for eid in question["entailed"] if eid != questionId] if correct and len(inferredQuestions) > 0: cosnsitencyScores = [] for eid in inferredQuestions: gold = questions[eid]["answer"] predicted = self.predictions[eid] score = self.toScore(predicted == gold) cosnsitencyScores.append(score) scores["consistency"].append(self.avg(cosnsitencyScores)) return scores ##### Functions for grounding score (optional, only for attention models) ########################################################################################## # Utility functions for working with bounding boxes. # c = (x0, y0, x1, y1), r = (r0, r1) def yrange(self, c): return (c[1], c[3]) def xrange(self, c): return (c[0], c[2]) def length(self, r): if r is None: return 0 return float(r[1] - r[0]) def size(self, c): return self.length(self.xrange(c)) * self.length(self.yrange(c)) def intersection(self, r1, r2): ir = (max(r1[0], r2[0]), min(r1[1], r2[1])) if ir[1] > ir[0]: return ir return None def intersectionSize(self, c1, c2): return self.length(self.intersection(self.xrange(c1), self.xrange(c2))) * self.length( self.intersection(self.yrange(c1), self.yrange(c2)) ) def intersectionRate(self, c1, c2): return float(self.intersectionSize(c1, c2)) / self.size(c1) # Get spatial cell def getCell(self, i, j): edge = float(1) / self.mapSize return (edge * i, edge * j, edge * (i + 1), edge * (j + 1)) # Get bounding box of objectId in sceneGraph def getRegion(self, sceneGraph, objectId): obj = sceneGraph["objects"][objectId] x0 = float(obj["x"]) / sceneGraph["width"] y0 = float(obj["y"]) / sceneGraph["height"] x1 = float(obj["x"] + obj["w"]) / sceneGraph["width"] y1 = float(obj["y"] + obj["h"]) / sceneGraph["height"] return (x0, y0, x1, y1) # Compute grounding score. Computer amount of attention (probability) given to each of the regions # the question and answers refer to. def computeGroundingScore(self, question, sceneGraph, attentionMap): ## prepare gold regions regions = [] # add question regions regions += [ self.getRegion(sceneGraph, pointer) for pointer in question["annotations"]["question"].values() ] # add answer regions regions += [ self.getRegion(sceneGraph, pointer) for pointer in question["annotations"]["fullAnswer"].values() ] # add all the image if the question refers to the whole scene if any(("scene" in c) for c in question["semantic"]): regions.append((0, 0, 1, 1)) # prepare attention map if self.objectFeatures: cells = [((x0, y0, x1, y1), attention) for x0, y0, x1, y1, attention in cells] else: cells = [ (self.getCell(i, j), attentionMap[i][j]) for i in range(self.mapSize) for j in range(self.mapSize) ] # compare attention map to gold regions scores = [] for region in regions: for cell, attention in cells: scores.append(attention * self.intersectionRate(cell, region)) return sum(scores) ##### Functions for distribution score ########################################################################################## # Compute chi square statistic of gold distribution vs predicted distribution, # averaged over all question groups def chiSquare(self, goldDist, predictedDist): sumScore, sumOverall = 0, 0 for group in goldDist: score, overall = 0, 0 for ans in goldDist[group]: e = goldDist[group][ans] o = predictedDist[group].get(ans, 0) score += (float(o - e) ** 2) / e overall += goldDist[group][ans] sumScore += score * overall sumOverall += overall avgScore = float(sumScore) / sumOverall return avgScore