Siddhant
Add eval metrics
f78ed8b
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import nltk
import math
import sys
from fractions import Fraction
import warnings
from collections import Counter
from nltk.translate.bleu_score import modified_precision, closest_ref_length, brevity_penalty, SmoothingFunction
import warnings
def corpus_bleu(
list_of_references,
hypotheses,
weights=(0.25, 0.25, 0.25, 0.25),
smoothing_function=None,
auto_reweigh=False,
averaging_mode="geometric",
no_length_penalty=False
):
"""
Calculate a single corpus-level BLEU score (aka. system-level BLEU) for all
the hypotheses and their respective references.
Instead of averaging the sentence level BLEU scores (i.e. marco-average
precision), the original BLEU metric (Papineni et al. 2002) accounts for
the micro-average precision (i.e. summing the numerators and denominators
for each hypothesis-reference(s) pairs before the division).
>>> hyp1 = ['It', 'is', 'a', 'guide', 'to', 'action', 'which',
... 'ensures', 'that', 'the', 'military', 'always',
... 'obeys', 'the', 'commands', 'of', 'the', 'party']
>>> ref1a = ['It', 'is', 'a', 'guide', 'to', 'action', 'that',
... 'ensures', 'that', 'the', 'military', 'will', 'forever',
... 'heed', 'Party', 'commands']
>>> ref1b = ['It', 'is', 'the', 'guiding', 'principle', 'which',
... 'guarantees', 'the', 'military', 'forces', 'always',
... 'being', 'under', 'the', 'command', 'of', 'the', 'Party']
>>> ref1c = ['It', 'is', 'the', 'practical', 'guide', 'for', 'the',
... 'army', 'always', 'to', 'heed', 'the', 'directions',
... 'of', 'the', 'party']
>>> hyp2 = ['he', 'read', 'the', 'book', 'because', 'he', 'was',
... 'interested', 'in', 'world', 'history']
>>> ref2a = ['he', 'was', 'interested', 'in', 'world', 'history',
... 'because', 'he', 'read', 'the', 'book']
>>> list_of_references = [[ref1a, ref1b, ref1c], [ref2a]]
>>> hypotheses = [hyp1, hyp2]
>>> corpus_bleu(list_of_references, hypotheses) # doctest: +ELLIPSIS
0.5920...
The example below show that corpus_bleu() is different from averaging
sentence_bleu() for hypotheses
>>> score1 = sentence_bleu([ref1a, ref1b, ref1c], hyp1)
>>> score2 = sentence_bleu([ref2a], hyp2)
>>> (score1 + score2) / 2 # doctest: +ELLIPSIS
0.6223...
:param list_of_references: a corpus of lists of reference sentences, w.r.t. hypotheses
:type list_of_references: list(list(list(str)))
:param hypotheses: a list of hypothesis sentences
:type hypotheses: list(list(str))
:param weights: weights for unigrams, bigrams, trigrams and so on
:type weights: list(float)
:param smoothing_function:
:type smoothing_function: SmoothingFunction
:param auto_reweigh: Option to re-normalize the weights uniformly.
:type auto_reweigh: bool
:return: The corpus-level BLEU score.
:rtype: float
"""
# Before proceeding to compute BLEU, perform sanity checks.
p_numerators = Counter() # Key = ngram order, and value = no. of ngram matches.
p_denominators = Counter() # Key = ngram order, and value = no. of ngram in ref.
hyp_lengths, ref_lengths = 0, 0
assert len(list_of_references) == len(hypotheses), (
"The number of hypotheses and their reference(s) should be the " "same "
)
# Iterate through each hypothesis and their corresponding references.
for references, hypothesis in zip(list_of_references, hypotheses):
# For each order of ngram, calculate the numerator and
# denominator for the corpus-level modified precision.
for i, _ in enumerate(weights, start=1):
p_i = modified_precision(references, hypothesis, i)
p_numerators[i] += p_i.numerator
p_denominators[i] += p_i.denominator
# Calculate the hypothesis length and the closest reference length.
# Adds them to the corpus-level hypothesis and reference counts.
hyp_len = len(hypothesis)
hyp_lengths += hyp_len
ref_lengths += closest_ref_length(references, hyp_len)
# Calculate corpus-level brevity penalty.
if no_length_penalty and averaging_mode == 'geometric':
bp = 1.0
elif no_length_penalty and averaging_mode == 'arithmetic':
bp = 0.0
else:
assert not no_length_penalty
assert averaging_mode != 'arithmetic', 'Not sure how to apply length penalty when aurithmetic mode'
bp = brevity_penalty(ref_lengths, hyp_lengths)
# Uniformly re-weighting based on maximum hypothesis lengths if largest
# order of n-grams < 4 and weights is set at default.
if auto_reweigh:
if hyp_lengths < 4 and weights == (0.25, 0.25, 0.25, 0.25):
weights = (1 / hyp_lengths,) * hyp_lengths
# Collects the various precision values for the different ngram orders.
p_n = [
Fraction(p_numerators[i], p_denominators[i], _normalize=False)
for i, _ in enumerate(weights, start=1)
]
# Returns 0 if there's no matching n-grams
# We only need to check for p_numerators[1] == 0, since if there's
# no unigrams, there won't be any higher order ngrams.
if p_numerators[1] == 0:
return 0
# If there's no smoothing, set use method0 from SmoothinFunction class.
if not smoothing_function:
smoothing_function = SmoothingFunction().method0
# Smoothen the modified precision.
# Note: smoothing_function() may convert values into floats;
# it tries to retain the Fraction object as much as the
# smoothing method allows.
p_n = smoothing_function(
p_n, references=references, hypothesis=hypothesis, hyp_len=hyp_lengths
)
if averaging_mode == "geometric":
s = (w_i * math.log(p_i) for w_i, p_i in zip(weights, p_n))
s = bp * math.exp(math.fsum(s))
elif averaging_mode == "arithmetic":
s = (w_i * p_i for w_i, p_i in zip(weights, p_n))
s = math.fsum(s)
return s
def sentence_bleu(
references,
hypothesis,
weights=(0.25, 0.25, 0.25, 0.25),
smoothing_function=None,
auto_reweigh=False,
averaging_mode="geometric",
no_length_penalty=False
):
return corpus_bleu(
[references], [hypothesis], weights, smoothing_function, auto_reweigh, averaging_mode, no_length_penalty
)
def get_target_sequences(manifest, ground_truth, to_take=1000):
import json
import pathlib
with open(ground_truth, 'r') as fin:
original_continuations = json.loads(fin.read())
sequence2length = [(k, v[0]) for k, v in original_continuations.items()]
assert all(float(v) >= 6.0 for (_, v) in sequence2length) # 6 seconds
sequence2length.sort(key=lambda x: x[1])
to_take_sequences = set(v[0] for v in sequence2length[:to_take])
to_take_ids = []
with open(manifest, 'r') as f:
f.readline()
for i, line in enumerate(f.readlines()):
seq_id = line.split()[0]
seq_id = pathlib.Path(seq_id).name.split('__')[0]
if seq_id in to_take_sequences:
to_take_ids.append(i)
print(f'Took {len(to_take_ids)} ids')
return set(to_take_ids)
def get_self_bleu(utterances, averaging_mode, weights):
self_bleu = []
for i in range(len(utterances)):
hypo = utterances[i]
rest = utterances[:i] + utterances[i+1:]
self_bleu.append(sentence_bleu(rest, hypo, weights,
no_length_penalty=True, averaging_mode=averaging_mode))
return self_bleu
def get_self_bleu2_arithmetic(utterances):
weights = (0.5, 0.5) # equal weight for unigrams and bigrams
return get_self_bleu(utterances, averaging_mode='arithmetic', weights=weights)
def get_self_bleu2_geometric(utterances):
weights = (0.5, 0.5)
return get_self_bleu(utterances, averaging_mode='geometric', weights=weights)
def get_auto_bleu2_arithmetic(utterances):
weights = (0.5, 0.5)
return [auto_bleu(u, mean_mode='arithmetic', weights=weights) for u in utterances]
def get_auto_bleu2_geometric(utterances):
weights = (0.5, 0.5)
return [auto_bleu(u, mean_mode='geometric', weights=weights) for u in utterances]
def get_auto_bleu3_geometric(utterances):
weights = (1./3, 1./3, 1./3)
return [auto_bleu(u, mean_mode='geometric', weights=weights) for u in utterances]
def get_auto_bleu3_arithmetic(utterances):
weights = (1./3, 1./3, 1./3)
return [auto_bleu(u, mean_mode='arithmetic', weights=weights) for u in utterances]
def get_self_bleu3_arithmetic(utterances):
weights = (1./3, 1./3, 1./3)
return get_self_bleu(utterances, averaging_mode='arithmetic', weights=weights)
def get_self_bleu3_geometric(utterances):
weights = (1./3, 1./3, 1./3)
return get_self_bleu(utterances, averaging_mode='geometric', weights=weights)
def auto_bleu(sentence, weights, mean_mode='arithmetic'):
if len(sentence) <= 1:
return 0
N = len(weights)
bleu_n = np.zeros([N])
for n in range(N):
targ_ngrams = list(nltk.ngrams(sentence, n+1))
for p in range(len(targ_ngrams)):
left = sentence[:p]
right = sentence[(p+n+1):]
rest_ngrams = list(nltk.ngrams(left, n+1)) + \
list(nltk.ngrams(right, n+1))
# compute the nb of matching ngrams
bleu_n[n] += targ_ngrams[p] in rest_ngrams
bleu_n[n] /= len(targ_ngrams) # average them to get a proportion
weights = np.array(weights)
if mean_mode == 'arithmetic':
return (bleu_n * weights).sum()
elif mean_mode == 'geometric':
return (bleu_n ** weights).prod()
else:
raise ValueError(f'Unknown agggregation mode {mean_mode}')
def run_f(task_params):
f, terms = task_params
return f(terms)