Spaces:
				
			
			
	
			
			
		Paused
		
	
	
	
			
			
	
	
	
	
		
		
		Paused
		
	| # Copyright (c) Facebook, Inc. and its affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| # | |
| # LASER Language-Agnostic SEntence Representations | |
| # is a toolkit to calculate multilingual sentence embeddings | |
| # and to use them for document classification, bitext filtering | |
| # and mining | |
| # | |
| # -------------------------------------------------------- | |
| # | |
| # Tool to calculate multilingual similarity error rate (xSIM) | |
| import faiss | |
| import numpy as np | |
| import typing as tp | |
| import os | |
| import json | |
| from enum import Enum | |
| class Margin(Enum): | |
| RATIO = "ratio" | |
| DISTANCE = "distance" | |
| ABSOLUTE = "absolute" | |
| def has_value(cls, value): | |
| return value in cls._value2member_map_ | |
| def xSIM( | |
| x: tp.Union[str, np.ndarray], | |
| y: tp.Union[str, np.ndarray], | |
| margin: str = Margin.RATIO.value, | |
| k: int = 4, | |
| dim: int = 1024, | |
| fp16: bool = False, | |
| eval_text: str = None, | |
| augmented_json: str = None, | |
| ) -> tp.Tuple[int, int, tp.Dict[str, int]]: | |
| assert Margin.has_value(margin), f"Margin type: {margin}, is not supported." | |
| if not isinstance(x, np.ndarray): | |
| x = _load_embeddings(x, dim, fp16) | |
| if not isinstance(y, np.ndarray): | |
| y = _load_embeddings(y, dim, fp16) | |
| # calculate xSIM error | |
| return calculate_error(x, y, margin, k, eval_text, augmented_json) | |
| def _load_embeddings(infile: str, dim: int, fp16: bool = False) -> np.ndarray: | |
| assert os.path.isfile(infile), f"file: {infile} does not exist." | |
| emb = np.fromfile(infile, dtype=np.float16 if fp16 else np.float32) | |
| num_examples = emb.shape[0] // dim | |
| emb.resize(num_examples, dim) | |
| if fp16: | |
| emb = emb.astype(np.float32) # faiss currently only supports fp32 | |
| return emb | |
| def score_margin( | |
| Dxy: np.ndarray, | |
| Ixy: np.ndarray, | |
| Ax: np.ndarray, | |
| Ay: np.ndarray, | |
| margin: str, | |
| k: int, | |
| ) -> np.ndarray: | |
| nbex = Dxy.shape[0] | |
| scores = np.zeros((nbex, k)) | |
| for i in range(nbex): | |
| for j in range(k): | |
| jj = Ixy[i, j] | |
| a = Dxy[i, j] | |
| b = (Ax[i] + Ay[jj]) / 2 | |
| if margin == Margin.RATIO.value: | |
| scores[i, j] = a / b | |
| else: # distance margin | |
| scores[i, j] = a - b | |
| return scores | |
| def _score_knn(x: np.ndarray, y: np.ndarray, k: int, margin: str) -> np.ndarray: | |
| nbex, dim = x.shape | |
| # create index | |
| idx_x = faiss.IndexFlatIP(dim) | |
| idx_y = faiss.IndexFlatIP(dim) | |
| # L2 normalization needed for cosine distance | |
| faiss.normalize_L2(x) | |
| faiss.normalize_L2(y) | |
| idx_x.add(x) | |
| idx_y.add(y) | |
| if margin == Margin.ABSOLUTE.value: | |
| scores, indices = idx_y.search(x, 1) | |
| else: | |
| # return cosine similarity and indices of k closest neighbors | |
| Cos_xy, Idx_xy = idx_y.search(x, k) | |
| Cos_yx, Idx_yx = idx_x.search(y, k) | |
| # average cosines | |
| Avg_xy = Cos_xy.mean(axis=1) | |
| Avg_yx = Cos_yx.mean(axis=1) | |
| scores = score_margin(Cos_xy, Idx_xy, Avg_xy, Avg_yx, margin, k) | |
| # find best | |
| best = scores.argmax(axis=1) | |
| indices = np.zeros((nbex, 1), dtype=np.int32) | |
| for i in range(nbex): | |
| indices[i] = Idx_xy[i, best[i]] | |
| return indices | |
| def get_transform(augmented_json, closest_neighbor, src): | |
| if ( | |
| closest_neighbor in augmented_json | |
| and augmented_json[closest_neighbor]["src"] == src | |
| ): | |
| return augmented_json[closest_neighbor]["errtype"] | |
| return "Misaligned" | |
| def calculate_error( | |
| x: np.ndarray, | |
| y: np.ndarray, | |
| margin: str = None, | |
| k: int = 4, | |
| eval_text: str = None, | |
| augmented_json: str = None, | |
| ) -> tp.Tuple[int, int, tp.Dict[str, int]]: | |
| if augmented_json: | |
| with open(augmented_json) as f: | |
| augmented_json = json.load(f) | |
| assert ( | |
| x.shape[0] < y.shape[0] | |
| ), f"Shape mismatch: {x.shape[0]} >= target {y.shape[0]}" | |
| else: | |
| assert ( | |
| x.shape == y.shape | |
| ), f"number of source {x.shape} / target {y.shape} shapes mismatch, " | |
| nbex = x.shape[0] | |
| augmented_report = {} | |
| # for each x calculate the highest scoring neighbor from y | |
| closest_neighbor = _score_knn(x, y, k, margin) | |
| if eval_text: # calc textual error | |
| lines = open(eval_text, encoding="utf-8", errors="surrogateescape").readlines() | |
| err = 0 | |
| for ex in range(nbex): | |
| if lines[ex] != lines[closest_neighbor[ex, 0]]: | |
| err += 1 | |
| if augmented_json: | |
| transform = get_transform( | |
| augmented_json, | |
| lines[closest_neighbor[ex, 0]].strip(), | |
| lines[ex].strip(), | |
| ) | |
| augmented_report[transform] = augmented_report.get(transform, 0) + 1 | |
| else: # calc index error | |
| ref = np.linspace(0, nbex - 1, nbex).astype(int) # [0, nbex) | |
| err = nbex - np.equal(closest_neighbor.reshape(nbex), ref).astype(int).sum() | |
| return err, nbex, augmented_report | |
