import spacy import transformers import numpy as np class TempPredictor: def __init__(self, model, tokenizer, device, spacy_model="en_core_web_sm"): self._model = model self._model.to(device) self._model.eval() self._tokenizer = tokenizer self._mtoken = self._tokenizer.mask_token self.unmasker = transformers.pipeline("fill-mask", model=self._model, tokenizer=self._tokenizer, device=0) try: self._spacy = spacy.load(spacy_model) except Exception as e: self._spacy = spacy.load("en_core_web_sm") print(f"Failed to load spacy model {spacy_model}, use default 'en_core_web_sm'\n{e}") def _extract_token_prob(self, arr, token, crop=1): for it in arr: if len(it["token_str"]) >= crop and (token == it["token_str"][crop:]): return it["score"] return 0. def _sent_lowercase(self, s): try: return s[0].lower() + s[1:] except: return s def _remove_punct(self, s): try: return s[:-1] except: return s def predict(self, e1, e2, top_k=5): txt = self._remove_punct(e1) + " " + self._mtoken + " " + self._sent_lowercase(e2) return self.unmasker(txt, top_k=top_k) def batch_predict(self, instances, top_k=5): txt = [self._remove_punct(e1) + " " + self._mtoken + " " + self._sent_lowercase(e2) for (e1, e2) in instances] return self.unmasker(txt, top_k=top_k) def get_temp(self, e1, e2, top_k=5, crop=1): inst1 = self.predict(e1, e2, top_k) inst2 = self.predict(e2, e1, top_k) # e1 before e2 b1 = self._extract_token_prob(inst1, "before", crop=crop) b2 = self._extract_token_prob(inst2, "after", crop=crop) # e1 after e2 a1 = self._extract_token_prob(inst1, "after", crop=crop) a2 = self._extract_token_prob(inst2, "before", crop=crop) return (b1+b2)/2, (a1+a2)/2 def get_temp_batch(self, instances, top_k=5, crop=1): reverse_instances = [(e2, e1) for (e1, e2) in instances] fwd_preds = self.batch_predict(instances, top_k=top_k) bwd_preds = self.batch_predict(reverse_instances, top_k=top_k) b1s = np.array([ self._extract_token_prob(pred, "before", crop=crop) for pred in fwd_preds ]) b2s = np.array([ self._extract_token_prob(pred, "before", crop=crop) for pred in bwd_preds ]) a1s = np.array([ self._extract_token_prob(pred, "after", crop=crop) for pred in fwd_preds ]) a2s = np.array([ self._extract_token_prob(pred, "after", crop=crop) for pred in bwd_preds ]) return np.array([np.array(b1s+b2s)/2, np.array(a1s+a2s)/2]).T def __call__(self, *args, **kwargs): return self.get_temp(*args, **kwargs)