| | import numpy as np |
| | from numpy import dot |
| | from numpy.linalg import norm |
| | import os |
| | from scipy import stats |
| | import shutil |
| | from sklearn import metrics |
| | import torch |
| | from torch import Tensor |
| | import torch.distributed as dist |
| | import torch.autograd as autograd |
| | from typing import TypeVar, Optional, Iterator, Sequence |
| | from torch.utils.data import Dataset, Sampler, DistributedSampler |
| | import math |
| |
|
| | T_co = TypeVar('T_co', covariant=True) |
| |
|
| | class WeightedRandomSampler(Sampler[int]): |
| | r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights). |
| | |
| | Args: |
| | weights (sequence) : a sequence of weights, not necessary summing up to one |
| | num_samples (int): number of samples to draw |
| | replacement (bool): if ``True``, samples are drawn with replacement. |
| | If not, they are drawn without replacement, which means that when a |
| | sample index is drawn for a row, it cannot be drawn again for that row. |
| | generator (Generator): Generator used in sampling. |
| | |
| | Example: |
| | >>> list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)) |
| | [4, 4, 1, 4, 5] |
| | >>> list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False)) |
| | [0, 1, 4, 3, 2] |
| | """ |
| | weights: Tensor |
| | num_samples: int |
| | replacement: bool |
| |
|
| | def __init__(self, weights: Sequence[float], num_samples: int, |
| | replacement: bool = True, generator=None) -> None: |
| | if not isinstance(num_samples, int) or isinstance(num_samples, bool) or \ |
| | num_samples <= 0: |
| | raise ValueError("num_samples should be a positive integer " |
| | "value, but got num_samples={}".format(num_samples)) |
| | if not isinstance(replacement, bool): |
| | raise ValueError("replacement should be a boolean value, but got " |
| | "replacement={}".format(replacement)) |
| | self.weights = torch.as_tensor(weights, dtype=torch.double) |
| | self.num_samples = num_samples |
| | self.replacement = replacement |
| | self.generator = generator |
| |
|
| | def __iter__(self) -> Iterator[int]: |
| | rand_tensor = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator) |
| | yield from iter(rand_tensor.tolist()) |
| |
|
| | def __len__(self) -> int: |
| | return self.num_samples |
| |
|
| | class DistributedSamplerWrapper(DistributedSampler): |
| | def __init__( |
| | self, sampler, dataset, |
| | num_replicas=None, |
| | rank=None, |
| | shuffle: bool = True): |
| | super(DistributedSamplerWrapper, self).__init__( |
| | dataset, num_replicas, rank, shuffle) |
| | |
| | self.sampler = sampler |
| |
|
| | def __iter__(self): |
| | if self.sampler.generator is None: |
| | self.sampler.generator = torch.Generator() |
| | self.sampler.generator.manual_seed(self.seed + self.epoch) |
| | indices = list(self.sampler) |
| | if self.epoch == 0: |
| | print(f"\n DistributedSamplerWrapper : {indices[:10]} \n\n") |
| | indices = indices[self.rank:self.total_size:self.num_replicas] |
| | return iter(indices) |
| | |
| |
|
| | class DistributedWeightedSampler(Sampler): |
| |
|
| | weights: Tensor |
| | num_samples: int |
| | replacement: bool |
| |
|
| | |
| | def __init__(self, dataset: Dataset, weights: Sequence[float], num_replicas: Optional[int] = None, |
| | rank: Optional[int] = None, replacement: bool = True, shuffle: bool = True, |
| | seed: int = 0, drop_last: bool = False) -> None: |
| | if num_replicas is None: |
| | if not dist.is_available(): |
| | raise RuntimeError("Requires distributed package to be available") |
| | num_replicas = dist.get_world_size() |
| | if rank is None: |
| | if not dist.is_available(): |
| | raise RuntimeError("Requires distributed package to be available") |
| | rank = dist.get_rank() |
| | if rank >= num_replicas or rank < 0: |
| | raise ValueError( |
| | "Invalid rank {}, rank should be in the interval" |
| | " [0, {}]".format(rank, num_replicas - 1)) |
| | self.dataset = dataset |
| | self.num_replicas = num_replicas |
| | self.rank = rank |
| | self.epoch = 0 |
| | self.drop_last = drop_last |
| | if self.drop_last and len(self.dataset) % self.num_replicas != 0: |
| | |
| | |
| | |
| | self.num_samples = math.ceil( |
| | (len(self.dataset) - self.num_replicas) / self.num_replicas |
| | ) |
| | else: |
| | self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) |
| | self.total_size = self.num_samples * self.num_replicas |
| | self.replacement = replacement |
| | self.weights = torch.from_numpy(weights) |
| | self.shuffle = shuffle |
| | self.seed = seed |
| |
|
| | def __iter__(self) -> Iterator[T_co]: |
| | |
| | if self.shuffle: |
| | g = torch.Generator() |
| | g.manual_seed(self.seed + self.epoch) |
| | indices = torch.randperm(len(self.dataset), generator=g).tolist() |
| | else: |
| | indices = list(range(len(self.dataset))) |
| |
|
| | if not self.drop_last: |
| | |
| | padding_size = self.total_size - len(indices) |
| | if padding_size <= len(indices): |
| | indices += indices[:padding_size] |
| | else: |
| | indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size] |
| | else: |
| | indices = indices[:self.total_size] |
| | assert len(indices) == self.total_size |
| |
|
| | |
| | indices = indices[self.rank:self.total_size:self.num_replicas] |
| | assert len(indices) == self.num_samples |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | weights = self.weights[indices] |
| |
|
| | subsample_rand_tensor = torch.multinomial(weights, self.num_samples, self.replacement) |
| | |
| | dataset_indices = torch.tensor(indices)[subsample_rand_tensor] |
| | return iter(dataset_indices.tolist()) |
| | |
| |
|
| | def __len__(self) -> int: |
| | return self.num_samples |
| |
|
| | def set_epoch(self, epoch) -> None: |
| | self.epoch = epoch |
| |
|
| | def off_diagonal(x): |
| | n, m = x.shape |
| | assert n == m |
| | return x.flatten()[:-1].view(n - 1, n + 1)[:, 1:].flatten() |
| |
|
| | def is_dist_avail_and_initialized(): |
| | if not dist.is_available(): |
| | return False |
| | if not dist.is_initialized(): |
| | return False |
| | return True |
| |
|
| | def get_world_size(): |
| | if not is_dist_avail_and_initialized(): |
| | return 1 |
| | return dist.get_world_size() |
| |
|
| |
|
| | def get_rank(): |
| | if not is_dist_avail_and_initialized(): |
| | return 0 |
| | return dist.get_rank() |
| |
|
| |
|
| | def is_main_process(): |
| | return get_rank() == 0 |
| |
|
| |
|
| | def save_on_master(state, is_best, output_dir): |
| | if is_main_process(): |
| | ckpt_path = f'{output_dir}/checkpoint.pt' |
| | best_path = f'{output_dir}/checkpoint_best.pt' |
| | torch.save(state, ckpt_path) |
| | if is_best: |
| | shutil.copyfile(ckpt_path, best_path) |
| |
|
| | def init_distributed_mode(args): |
| | if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: |
| | args.rank = int(os.environ["RANK"]) |
| | args.world_size = int(os.environ['WORLD_SIZE']) |
| | args.gpu = int(os.environ['LOCAL_RANK']) |
| | elif 'SLURM_PROCID' in os.environ: |
| | args.rank = int(os.environ['SLURM_PROCID']) |
| | args.gpu = args.rank % torch.cuda.device_count() |
| | else: |
| | print('Not using distributed mode') |
| | args.distributed = False |
| | return |
| |
|
| | args.distributed = True |
| |
|
| | torch.cuda.set_device(args.gpu) |
| | args.dist_backend = 'nccl' |
| | print('| distributed init (rank {}): {}'.format( |
| | args.rank, args.dist_url), flush=True) |
| | torch.distributed.init_process_group(backend=args.dist_backend, init_method=args.dist_url, |
| | world_size=args.world_size, rank=args.rank) |
| | torch.distributed.barrier() |
| | setup_for_distributed(args.rank == 0) |
| |
|
| |
|
| | def scaled_all_reduce(tensors, is_scale=True): |
| | """Performs the scaled all_reduce operation on the provided tensors. |
| | The input tensors are modified in-place. Currently supports only the sum |
| | reduction operator. The reduced values are scaled by the inverse size of the |
| | world size. |
| | """ |
| | world_size = get_world_size() |
| | |
| | if world_size == 1: |
| | return tensors |
| | |
| | reductions = [] |
| | for tensor in tensors: |
| | reduction = dist.all_reduce(tensor, async_op=True) |
| | reductions.append(reduction) |
| | |
| | for reduction in reductions: |
| | reduction.wait() |
| | |
| | if is_scale: |
| | for tensor in tensors: |
| | tensor.mul_(1.0 / world_size) |
| | return tensors |
| |
|
| |
|
| | def all_gather_batch(tensors): |
| | """ |
| | Performs all_gather operation on the provided tensors. |
| | """ |
| | |
| | world_size = get_world_size() |
| | |
| | if world_size == 1: |
| | return tensors |
| | tensor_list = [] |
| | output_tensor = [] |
| | for tensor in tensors: |
| | tensor_all = [torch.ones_like(tensor) for _ in range(world_size)] |
| | dist.all_gather( |
| | tensor_all, |
| | tensor, |
| | async_op=False |
| | ) |
| |
|
| | tensor_list.append(tensor_all) |
| |
|
| | for tensor_all in tensor_list: |
| | output_tensor.append(torch.cat(tensor_all, dim=0)) |
| | return output_tensor |
| |
|
| |
|
| | class GatherLayer(autograd.Function): |
| | """ |
| | Gather tensors from all workers with support for backward propagation: |
| | This implementation does not cut the gradients as torch.distributed.all_gather does. |
| | """ |
| |
|
| | @staticmethod |
| | def forward(ctx, x): |
| | output = [torch.zeros_like(x) for _ in range(dist.get_world_size())] |
| | dist.all_gather(output, x) |
| | return tuple(output) |
| |
|
| | @staticmethod |
| | def backward(ctx, *grads): |
| | all_gradients = torch.stack(grads) |
| | dist.all_reduce(all_gradients) |
| | return all_gradients[dist.get_rank()] |
| |
|
| |
|
| | def all_gather_batch_with_grad(tensors): |
| | """ |
| | Performs all_gather operation on the provided tensors. |
| | Graph remains connected for backward grad computation. |
| | """ |
| | |
| | world_size = get_world_size() |
| | |
| | if world_size == 1: |
| | return tensors |
| | tensor_list = [] |
| | output_tensor = [] |
| |
|
| | for tensor in tensors: |
| | tensor_all = GatherLayer.apply(tensor) |
| | tensor_list.append(tensor_all) |
| |
|
| | for tensor_all in tensor_list: |
| | output_tensor.append(torch.cat(tensor_all, dim=0)) |
| | return output_tensor |
| |
|
| |
|
| | class AverageMeter(object): |
| | """Computes and stores the average and current value""" |
| | def __init__(self, name, fmt=':f'): |
| | self.name = name |
| | self.fmt = fmt |
| | self.reset() |
| |
|
| | def reset(self): |
| | self.val = 0 |
| | self.avg = 0 |
| | self.sum = 0 |
| | self.count = 0 |
| |
|
| | def update(self, val, n=1): |
| | self.val = val |
| | self.sum += val * n |
| | self.count += n |
| | self.avg = self.sum / self.count |
| |
|
| | def cat(self, val, n=1): |
| | self.val = val |
| | self.sum += val * n |
| | self.count += n |
| | self.avg = self.sum / self.count |
| |
|
| | def synchronize(self): |
| | if not is_dist_avail_and_initialized(): |
| | return |
| | t = torch.tensor([self.sum, self.count], dtype=torch.float64, device='cuda') |
| | dist.barrier() |
| | dist.all_reduce(t) |
| | t = t.tolist() |
| | if math.isnan(t[0]): |
| | |
| | self.sum = 1e9 |
| | else: |
| | self.sum = int(t[0]) |
| | self.count = t[1] |
| | self.avg = self.sum / self.count |
| |
|
| | def __str__(self): |
| | |
| | fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' |
| | return fmtstr.format(**self.__dict__) |
| |
|
| | |
| | class ProgressMeter(object): |
| | def __init__(self, num_batches, meters, prefix=""): |
| | self.batch_fmtstr = self._get_batch_fmtstr(num_batches) |
| | self.meters = meters |
| | self.prefix = prefix |
| |
|
| | def display(self, batch): |
| | entries = [self.prefix + self.batch_fmtstr.format(batch)] |
| | entries += [str(meter) for meter in self.meters] |
| | print('\t'.join(entries)) |
| |
|
| | return entries |
| |
|
| | def synchronize(self): |
| | for meter in self.meters: |
| | meter.synchronize() |
| |
|
| | def _get_batch_fmtstr(self, num_batches): |
| | num_digits = len(str(num_batches // 1)) |
| | fmt = '{:' + str(num_digits) + 'd}' |
| | return '[' + fmt + '/' + fmt.format(num_batches) + ']' |
| |
|
| | def accuracy(output, target, topk=(1,)): |
| | """Computes the accuracy over the k top predictions for the specified values of k""" |
| | with torch.no_grad(): |
| | maxk = max(topk) |
| | batch_size = target.size(0) |
| |
|
| | _, pred = output.topk(maxk, 1, True, True) |
| | pred = pred.t() |
| | correct = pred.eq(target.reshape(1, -1).expand_as(pred)) |
| |
|
| | res = [] |
| | for k in topk: |
| | correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True) |
| | res.append(correct_k.mul_(100.0 / batch_size)) |
| | return res |
| |
|
| | |
| |
|
| | def d_prime(auc): |
| | standard_normal = stats.norm() |
| | d_prime = standard_normal.ppf(auc) * np.sqrt(2.0) |
| | return d_prime |
| |
|
| | def calculate_stats(output, target): |
| | """Calculate statistics including mAP, AUC, etc. |
| | |
| | Args: |
| | output: 2d array, (samples_num, classes_num) |
| | target: 2d array, (samples_num, classes_num) |
| | |
| | Returns: |
| | stats: list of statistic of each class. |
| | """ |
| |
|
| | classes_num = target.shape[-1] |
| | stats = [] |
| |
|
| | output = output.cpu() |
| | target = target.cpu() |
| |
|
| | |
| | acc = metrics.accuracy_score(np.argmax(target, 1), np.argmax(output, 1)) |
| |
|
| | |
| | for k in range(classes_num): |
| |
|
| | |
| | avg_precision = metrics.average_precision_score( |
| | target[:, k], output[:, k], average=None) |
| |
|
| | |
| | try: |
| | auc = metrics.roc_auc_score(target[:, k], output[:, k], average=None) |
| |
|
| | |
| | (precisions, recalls, thresholds) = metrics.precision_recall_curve( |
| | target[:, k], output[:, k]) |
| |
|
| | |
| | (fpr, tpr, thresholds) = metrics.roc_curve(target[:, k], output[:, k]) |
| |
|
| | save_every_steps = 1000 |
| | dict = {'precisions': precisions[0::save_every_steps], |
| | 'recalls': recalls[0::save_every_steps], |
| | 'AP': avg_precision, |
| | 'fpr': fpr[0::save_every_steps], |
| | 'fnr': 1. - tpr[0::save_every_steps], |
| | 'auc': auc, |
| | |
| | 'acc': acc |
| | } |
| | except: |
| | dict = {'precisions': -1, |
| | 'recalls': -1, |
| | 'AP': avg_precision, |
| | 'fpr': -1, |
| | 'fnr': -1, |
| | 'auc': -1, |
| | |
| | 'acc': acc |
| | } |
| | print('class {:s} no true sample'.format(str(k))) |
| | stats.append(dict) |
| |
|
| | return stats |
| |
|
| |
|
| | |
| | def get_similarity(a, b): |
| | cos_sim = dot(a, b) / (norm(a) * norm(b)) |
| | return cos_sim |
| |
|
| | |
| | def get_sim_mat(a, b): |
| | B = a.shape[0] |
| | sim_mat = np.empty([B, B]) |
| | for i in range(B): |
| | for j in range(B): |
| | sim_mat[i, j] = get_similarity(a[i, :], b[j, :]) |
| | return sim_mat |
| |
|
| | def compute_metrics(x): |
| | sx = np.sort(-x, axis=1) |
| | d = np.diag(-x) |
| | d = d[:, np.newaxis] |
| | ind = sx - d |
| | ind = np.where(ind == 0) |
| | ind = ind[1] |
| | metrics = {} |
| | metrics['R1'] = float(np.sum(ind == 0)) / len(ind) |
| | metrics['R5'] = float(np.sum(ind < 5)) / len(ind) |
| | metrics['R10'] = float(np.sum(ind < 10)) / len(ind) |
| | metrics['MR'] = np.median(ind) + 1 |
| | return metrics |