|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
from tqdm import tqdm |
|
import os.path |
|
import glob |
|
import json |
|
from mmengine.logging import print_log |
|
|
|
|
|
|
|
class eval_gqa(): |
|
|
|
def __init__( |
|
self, |
|
tier="val", |
|
scenes="{tier}_sceneGraphs.json", |
|
questions="{tier}_all_questions.json", |
|
choices="{tier}_choices.json", |
|
predictions="{tier}_predictions.json", |
|
attentions="{tier}_attentions.json", |
|
consistency=False, |
|
grounding=False, |
|
objectFeatures=False, |
|
mapSize=7, |
|
): |
|
|
|
self.consistency = consistency |
|
self.grounding = grounding |
|
self.objectFeatures = objectFeatures |
|
self.mapSize = mapSize |
|
if not consistency: |
|
print_log("Please consider using --consistency to compute consistency scores for entailed questions.", |
|
'current') |
|
print_log("If you do so, please provide answers to all questions in val_all_questions.json.\n", 'current') |
|
|
|
if not grounding: |
|
print_log("Please consider using --grounding to compute attention scores.", 'current') |
|
print_log("If you do so, please provide attention maps through --attentions.\n", 'current') |
|
|
|
|
|
|
|
|
|
print_log("Loading scene graphs...", 'current') |
|
try: |
|
self.scenes = self.loadFile(scenes.format(tier=self.tier)) |
|
except: |
|
print_log('Failed to load scene graphs -- cannot evaluate grounding') |
|
self.scenes = None |
|
|
|
|
|
print_log("Loading questions...", 'current') |
|
self.questions = self.loadFile(questions) |
|
|
|
|
|
print_log("Loading choices...", 'current') |
|
try: |
|
self.choices = self.loadFile(choices.format(tier=self.tier)) |
|
except: |
|
print_log('Failed to load choices -- cannot evaluate validity or plausibility', 'current') |
|
self.choices = None |
|
|
|
|
|
print_log("Loading predictions...", 'current') |
|
predictions = self.loadFile(predictions.format(tier=tier)) |
|
self.predictions = {p["questionId"]: p["prediction"] for p in predictions} |
|
|
|
|
|
for qid in self.questions: |
|
if (qid not in self.predictions) and (consistency or self.questions[qid]["isBalanced"]): |
|
print_log("no prediction for question {}. Please add prediction for all questions.".format(qid), |
|
'current') |
|
raise Exception("missing predictions") |
|
|
|
|
|
self.attentions = None |
|
if grounding: |
|
with open(attentions.format(tier=tier)) as attentionsFile: |
|
attentions = json.load(attentionsFile) |
|
self.attentions = {a["questionId"]: a["attention"] for a in attentions} |
|
|
|
def forward(self): |
|
|
|
|
|
scores = { |
|
"accuracy": [], |
|
"binary": [], |
|
|
|
"open": [], |
|
"validity": [], |
|
"plausibility": [], |
|
"consistency": [], |
|
"accuracyPerStructuralType": defaultdict(list), |
|
|
|
"accuracyPerSemanticType": defaultdict(list), |
|
|
|
"accuracyPerLength": defaultdict(list), |
|
"accuracyPerSteps": defaultdict(list), |
|
|
|
"grounding": [], |
|
} |
|
|
|
|
|
dist = {"gold": defaultdict(lambda: defaultdict(int)), "predicted": defaultdict(lambda: defaultdict(int))} |
|
|
|
|
|
|
|
|
|
for qid, question in tqdm(self.questions.items()): |
|
|
|
|
|
if question["isBalanced"]: |
|
gold = question["answer"] |
|
predicted = self.predictions[qid] |
|
|
|
correct = predicted == gold |
|
score = self.toScore(correct) |
|
|
|
wordsNum = self.getWordsNum(question) |
|
stepsNum = self.getStepsNum(question) |
|
|
|
|
|
scores["accuracy"].append(score) |
|
scores["accuracyPerLength"][wordsNum].append(score) |
|
scores["accuracyPerSteps"][stepsNum].append(score) |
|
scores["accuracyPerStructuralType"][question["types"]["structural"]].append(score) |
|
scores["accuracyPerSemanticType"][question["types"]["semantic"]].append(score) |
|
answerType = "open" if question["types"]["structural"] == "query" else "binary" |
|
scores[answerType].append(score) |
|
|
|
|
|
valid = ( |
|
self.belongs(predicted, self.choices[qid]["valid"], question) if self.choices else False |
|
) |
|
scores["validity"].append(self.toScore(valid)) |
|
|
|
|
|
plausible = ( |
|
self.belongs(predicted, self.choices[qid]["plausible"], question) |
|
if self.choices |
|
else False |
|
) |
|
scores["plausibility"].append(self.toScore(plausible)) |
|
|
|
|
|
if self.attentions is not None: |
|
groundingScore = self.computeGroundingScore( |
|
question, self.scenes[question["imageId"]], self.attentions[qid] |
|
) |
|
if groundingScore is not None: |
|
scores["grounding"].append(groundingScore) |
|
|
|
|
|
globalGroup = question["groups"]["global"] |
|
if globalGroup is not None: |
|
dist["gold"][globalGroup][gold] += 1 |
|
dist["predicted"][globalGroup][predicted] += 1 |
|
|
|
if self.consistency: |
|
|
|
scores = self.updateConsistency(qid, question, self.questions, correct, scores) |
|
|
|
|
|
scores["distribution"] = self.chiSquare(dist["gold"], dist["predicted"]) / 100 |
|
|
|
|
|
|
|
metrics = [ |
|
"binary", |
|
"open", |
|
"accuracy", |
|
"consistency", |
|
"validity", |
|
"plausibility", |
|
"grounding", |
|
"distribution", |
|
] |
|
|
|
detailedMetrics = [ |
|
("accuracyPerStructuralType", "Accuracy / structural type"), |
|
("accuracyPerSemanticType", "Accuracy / semantic type"), |
|
("accuracyPerSteps", "Accuracy / steps number"), |
|
("accuracyPerLength", "Accuracy / words number"), |
|
] |
|
|
|
subMetrics = {"attr": "attribute", "cat": "category", "global": "scene", "obj": "object", "rel": "relation"} |
|
|
|
for k in metrics: |
|
if isinstance(scores[k], list): |
|
scores[k] = self.avg(scores[k]) * 100 |
|
|
|
for k, _ in detailedMetrics: |
|
for t in scores[k]: |
|
scores[k][t] = self.avg(scores[k][t]) * 100, len(scores[k][t]) |
|
|
|
|
|
for m in metrics: |
|
|
|
if m == "grounding" and not self.grounding: |
|
continue |
|
if m == "consistency" and not self.consistency: |
|
continue |
|
|
|
|
|
print_log( |
|
"{title}: {score:.2f}{suffix}".format( |
|
title=m.capitalize(), |
|
score=scores[m], |
|
suffix=" (lower is better)" if m == "distribution" else "%", |
|
) |
|
, 'current') |
|
|
|
for m, mPrintName in detailedMetrics: |
|
print_log("") |
|
|
|
print_log("{}:".format(mPrintName)) |
|
|
|
for t in sorted(list(scores[m].keys())): |
|
|
|
tName = t |
|
if isinstance(scores[k], list): |
|
tName = subMetrics.get(t, t).capitalize() |
|
|
|
|
|
print_log( |
|
" {title}: {score:.2f}{suffix} ({amount} questions)".format( |
|
title=tName, score=scores[m][t][0], suffix="%", amount=scores[m][t][1] |
|
) |
|
, 'current') |
|
return scores |
|
|
|
def loadFile(self, name): |
|
|
|
if os.path.isfile(name): |
|
with open(name) as file: |
|
data = json.load(file) |
|
|
|
elif os.path.isdir(name.split(".")[0]): |
|
data = {} |
|
chunks = glob.glob('{dir}/{dir}_*.{ext}'.format(dir=name.split(".")[0], ext=name.split(".")[1])) |
|
for chunk in chunks: |
|
with open(chunk) as file: |
|
data.update(json.load(file)) |
|
else: |
|
raise Exception("Can't find {}".format(name)) |
|
return data |
|
|
|
|
|
|
|
|
|
|
|
def toScore(self, b): |
|
return float(1 if b else 0) |
|
|
|
|
|
def avg(self, l): |
|
if len(l) == 0: |
|
return 0 |
|
return float(sum(l)) / len(l) |
|
|
|
def wavg(self, l, w): |
|
if sum(w) == 0: |
|
return None |
|
return float(sum(l[i] * w[i] for i in range(len(l)))) / sum(w) |
|
|
|
|
|
|
|
|
|
|
|
def getWordsNum(self, question): |
|
return len(question["question"].split()) |
|
|
|
|
|
def getStepsNum(self, question): |
|
return len( |
|
[ |
|
c |
|
for c in question["semantic"] |
|
if not ( |
|
any( |
|
[ |
|
o in "{}: {}".format(c["operation"], c["argument"]) |
|
for o in ["exist", "query: name", "choose name"] |
|
] |
|
) |
|
) |
|
] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def belongs(self, element, group, question): |
|
|
|
if "Common" in question["types"]["detailed"]: |
|
group = ["color", "material", "shape"] |
|
|
|
return element in group |
|
|
|
|
|
|
|
|
|
def updateConsistency(self, questionId, question, questions, correct, scores): |
|
inferredQuestions = [eid for eid in question["entailed"] if eid != questionId] |
|
|
|
if correct and len(inferredQuestions) > 0: |
|
|
|
cosnsitencyScores = [] |
|
for eid in inferredQuestions: |
|
gold = questions[eid]["answer"] |
|
predicted = self.predictions[eid] |
|
score = self.toScore(predicted == gold) |
|
cosnsitencyScores.append(score) |
|
|
|
scores["consistency"].append(self.avg(cosnsitencyScores)) |
|
return scores |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def yrange(self, c): |
|
return (c[1], c[3]) |
|
|
|
def xrange(self, c): |
|
return (c[0], c[2]) |
|
|
|
def length(self, r): |
|
if r is None: |
|
return 0 |
|
return float(r[1] - r[0]) |
|
|
|
def size(self, c): |
|
return self.length(self.xrange(c)) * self.length(self.yrange(c)) |
|
|
|
def intersection(self, r1, r2): |
|
ir = (max(r1[0], r2[0]), min(r1[1], r2[1])) |
|
if ir[1] > ir[0]: |
|
return ir |
|
return None |
|
|
|
def intersectionSize(self, c1, c2): |
|
return self.length(self.intersection(self.xrange(c1), self.xrange(c2))) * self.length( |
|
self.intersection(self.yrange(c1), self.yrange(c2)) |
|
) |
|
|
|
def intersectionRate(self, c1, c2): |
|
return float(self.intersectionSize(c1, c2)) / self.size(c1) |
|
|
|
|
|
def getCell(self, i, j): |
|
edge = float(1) / self.mapSize |
|
return (edge * i, edge * j, edge * (i + 1), edge * (j + 1)) |
|
|
|
|
|
def getRegion(self, sceneGraph, objectId): |
|
obj = sceneGraph["objects"][objectId] |
|
x0 = float(obj["x"]) / sceneGraph["width"] |
|
y0 = float(obj["y"]) / sceneGraph["height"] |
|
x1 = float(obj["x"] + obj["w"]) / sceneGraph["width"] |
|
y1 = float(obj["y"] + obj["h"]) / sceneGraph["height"] |
|
return (x0, y0, x1, y1) |
|
|
|
|
|
|
|
def computeGroundingScore(self, question, sceneGraph, attentionMap): |
|
|
|
regions = [] |
|
|
|
regions += [ |
|
self.getRegion(sceneGraph, pointer) for pointer in question["annotations"]["question"].values() |
|
] |
|
|
|
regions += [ |
|
self.getRegion(sceneGraph, pointer) for pointer in question["annotations"]["fullAnswer"].values() |
|
] |
|
|
|
if any(("scene" in c) for c in question["semantic"]): |
|
regions.append((0, 0, 1, 1)) |
|
|
|
|
|
if self.objectFeatures: |
|
|
|
pass |
|
else: |
|
cells = [ |
|
(self.getCell(i, j), attentionMap[i][j]) |
|
for i in range(self.mapSize) |
|
for j in range(self.mapSize) |
|
] |
|
|
|
|
|
scores = [] |
|
for region in regions: |
|
for cell, attention in cells: |
|
scores.append(attention * self.intersectionRate(cell, region)) |
|
return sum(scores) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def chiSquare(self, goldDist, predictedDist): |
|
sumScore, sumOverall = 0, 0 |
|
|
|
for group in goldDist: |
|
score, overall = 0, 0 |
|
|
|
for ans in goldDist[group]: |
|
e = goldDist[group][ans] |
|
o = predictedDist[group].get(ans, 0) |
|
score += (float(o - e) ** 2) / e |
|
overall += goldDist[group][ans] |
|
|
|
sumScore += score * overall |
|
sumOverall += overall |
|
|
|
avgScore = float(sumScore) / sumOverall |
|
|
|
return avgScore |
|
|