File size: 6,624 Bytes
80f72f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
from dataclasses import dataclass
from typing import List, Union, Optional
import numpy as np
from nmtscore import NMTScorer
from scipy.special import softmax
from scipy.stats import permutation_test
@dataclass
class TranslationDirectionResult:
sentence1: Union[str, List[str]]
sentence2: Union[str, List[str]]
lang1: str
lang2: str
raw_prob_1_to_2: float
raw_prob_2_to_1: float
pvalue: Optional[float] = None
@property
def num_sentences(self):
return len(self.sentence1) if isinstance(self.sentence1, list) else 1
@property
def prob_1_to_2(self):
return softmax([self.raw_prob_1_to_2, self.raw_prob_2_to_1])[0]
@property
def prob_2_to_1(self):
return softmax([self.raw_prob_1_to_2, self.raw_prob_2_to_1])[1]
@property
def predicted_direction(self) -> str:
if self.raw_prob_1_to_2 >= self.raw_prob_2_to_1:
return self.lang1 + '→' + self.lang2
else:
return self.lang2 + '→' + self.lang1
def __str__(self):
s = f"""\
Predicted direction: {self.predicted_direction}
{self.num_sentences} sentence pair{"s" if self.num_sentences > 1 else ""}
{self.lang1}→{self.lang2}: {self.prob_1_to_2:.3f}
{self.lang2}→{self.lang1}: {self.prob_2_to_1:.3f}"""
if self.pvalue is not None:
s += f"\np-value: {self.pvalue}\n"
return s
class TranslationDirectionDetector:
def __init__(self, scorer: NMTScorer = None, use_normalization: bool = False):
self.scorer = scorer or NMTScorer()
self.use_normalization = use_normalization
def detect(self,
sentence1: Union[str, List[str]],
sentence2: Union[str, List[str]],
lang1: str,
lang2: str,
return_pvalue: bool = False,
pvalue_n_resamples: int = 9999,
score_kwargs: dict = None
) -> TranslationDirectionResult:
if isinstance(sentence1, list) and isinstance(sentence2, list):
if len(sentence1) != len(sentence2):
raise ValueError("Lists sentence1 and sentence2 must have same length")
if len(sentence1) == 0:
raise ValueError("Lists sentence1 and sentence2 must not be empty")
if len(sentence1) == 1 and return_pvalue:
raise ValueError("return_pvalue=True requires the documents to have multiple sentences")
if lang1 == lang2:
raise ValueError("lang1 and lang2 must be different")
prob_1_to_2 = self.scorer.score_direct(
sentence2, sentence1,
lang2, lang1,
normalize=self.use_normalization,
both_directions=False,
score_kwargs=score_kwargs
)
prob_2_to_1 = self.scorer.score_direct(
sentence1, sentence2,
lang1, lang2,
normalize=self.use_normalization,
both_directions=False,
score_kwargs=score_kwargs
)
pvalue = None
if isinstance(sentence1, list): # document-level
# Compute the average probability per target token, across the complete document
# 1. Convert probabilities back to log probabilities
log_prob_1_to_2 = np.log2(np.array(prob_1_to_2))
log_prob_2_to_1 = np.log2(np.array(prob_2_to_1))
# 2. Reverse the sentence-level length normalization
sentence1_lengths = np.array([self._get_sentence_length(s) for s in sentence1])
sentence2_lengths = np.array([self._get_sentence_length(s) for s in sentence2])
log_prob_1_to_2 = sentence2_lengths * log_prob_1_to_2
log_prob_2_to_1 = sentence1_lengths * log_prob_2_to_1
# 4. Sum up the log probabilities across the document
total_log_prob_1_to_2 = log_prob_1_to_2.sum()
total_log_prob_2_to_1 = log_prob_2_to_1.sum()
# 3. Document-level length normalization
avg_log_prob_1_to_2 = total_log_prob_1_to_2 / sum(sentence2_lengths)
avg_log_prob_2_to_1 = total_log_prob_2_to_1 / sum(sentence1_lengths)
# 4. Convert back to probabilities
prob_1_to_2 = 2 ** avg_log_prob_1_to_2
prob_2_to_1 = 2 ** avg_log_prob_2_to_1
if return_pvalue:
x = np.vstack([log_prob_1_to_2, sentence2_lengths]).T
y = np.vstack([log_prob_2_to_1, sentence1_lengths]).T
result = permutation_test(
data=(x, y),
statistic=self._statistic_token_mean,
permutation_type="samples",
n_resamples=pvalue_n_resamples,
)
pvalue = result.pvalue
else:
if return_pvalue:
raise ValueError("return_pvalue=True requires sentence1 and sentence2 to be lists of sentences")
return TranslationDirectionResult(
sentence1=sentence1,
sentence2=sentence2,
lang1=lang1,
lang2=lang2,
raw_prob_1_to_2=prob_1_to_2,
raw_prob_2_to_1=prob_2_to_1,
pvalue=pvalue,
)
def _get_sentence_length(self, sentence: str) -> int:
tokens = self.scorer.model.tokenizer.tokenize(sentence)
return len(tokens)
@staticmethod
def _statistic_token_mean(x: np.ndarray, y: np.ndarray, axis: int = -1) -> float:
"""
Statistic for scipy.stats.permutation_test
:param x: Matrix of shape (2 x num_sentences). The first row contains the unnormalized log probability
for lang1→lang2, the second row contains the sentence lengths in lang2.
:param y: Same as x, but for lang2→lang1
:return: Difference between lang1→lang2 and lang2→lang1
"""
if axis != -1:
raise NotImplementedError("Only axis=-1 is supported")
# Add batch dim
if x.ndim == 2:
x = x[np.newaxis, ...]
y = y[np.newaxis, ...]
# Sum up the log probabilities across the document
total_log_prob_1_to_2 = x[:, 0].sum(axis=axis)
total_log_prob_2_to_1 = y[:, 0].sum(axis=axis)
# Document-level length normalization
avg_log_prob_1_to_2 = total_log_prob_1_to_2 / x[:, 1].sum(axis=axis)
avg_log_prob_2_to_1 = total_log_prob_2_to_1 / y[:, 1].sum(axis=axis)
# Convert to probabilities
prob_1_to_2 = 2 ** avg_log_prob_1_to_2
prob_2_to_1 = 2 ** avg_log_prob_2_to_1
# Compute difference
return prob_1_to_2 - prob_2_to_1
|