import time import sys import types import chardet import numpy as np import torch import torch.distributed as dist from utils.ckpt_utils import load_ckpt def reduce_tensors(metrics): new_metrics = {} for k, v in metrics.items(): if isinstance(v, torch.Tensor): dist.all_reduce(v) v = v / dist.get_world_size() if type(v) is dict: v = reduce_tensors(v) new_metrics[k] = v return new_metrics def tensors_to_scalars(tensors): if isinstance(tensors, torch.Tensor): tensors = tensors.item() return tensors elif isinstance(tensors, dict): new_tensors = {} for k, v in tensors.items(): v = tensors_to_scalars(v) new_tensors[k] = v return new_tensors elif isinstance(tensors, list): return [tensors_to_scalars(v) for v in tensors] else: return tensors def tensors_to_np(tensors): if isinstance(tensors, dict): new_np = {} for k, v in tensors.items(): if isinstance(v, torch.Tensor): v = v.cpu().numpy() if type(v) is dict: v = tensors_to_np(v) new_np[k] = v elif isinstance(tensors, list): new_np = [] for v in tensors: if isinstance(v, torch.Tensor): v = v.cpu().numpy() if type(v) is dict: v = tensors_to_np(v) new_np.append(v) elif isinstance(tensors, torch.Tensor): v = tensors if isinstance(v, torch.Tensor): v = v.cpu().numpy() if type(v) is dict: v = tensors_to_np(v) new_np = v else: raise Exception(f'tensors_to_np does not support type {type(tensors)}.') return new_np def move_to_cpu(tensors): ret = {} for k, v in tensors.items(): if isinstance(v, torch.Tensor): v = v.cpu() if type(v) is dict: v = move_to_cpu(v) ret[k] = v return ret def move_to_cuda(batch, gpu_id=0): # base case: object can be directly moved using `cuda` or `to` if callable(getattr(batch, 'cuda', None)): return batch.cuda(gpu_id, non_blocking=True) elif callable(getattr(batch, 'to', None)): return batch.to(torch.device('cuda', gpu_id), non_blocking=True) elif isinstance(batch, list): for i, x in enumerate(batch): batch[i] = move_to_cuda(x, gpu_id) return batch elif isinstance(batch, tuple): batch = list(batch) for i, x in enumerate(batch): batch[i] = move_to_cuda(x, gpu_id) return tuple(batch) elif isinstance(batch, dict): for k, v in batch.items(): batch[k] = move_to_cuda(v, gpu_id) return batch return batch class AvgrageMeter(object): def __init__(self): self.reset() def reset(self): self.avg = 0 self.sum = 0 self.cnt = 0 def update(self, val, n=1): self.sum += val * n self.cnt += n self.avg = self.sum / self.cnt def collate_1d(values, pad_idx=0, left_pad=False, shift_right=False, max_len=None, shift_id=1): """Convert a list of 1d tensors into a padded 2d tensor.""" size = max(v.size(0) for v in values) if max_len is None else max_len res = values[0].new(len(values), size).fill_(pad_idx) def copy_tensor(src, dst): assert dst.numel() == src.numel() if shift_right: dst[1:] = src[:-1] dst[0] = shift_id else: dst.copy_(src) for i, v in enumerate(values): copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)]) return res def collate_2d(values, pad_idx=0, left_pad=False, shift_right=False, max_len=None): """Convert a list of 2d tensors into a padded 3d tensor.""" size = max(v.size(0) for v in values) if max_len is None else max_len res = values[0].new(len(values), size, values[0].shape[1]).fill_(pad_idx) def copy_tensor(src, dst): assert dst.numel() == src.numel() if shift_right: dst[1:] = src[:-1] else: dst.copy_(src) for i, v in enumerate(values): copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)]) return res def _is_batch_full(batch, num_tokens, max_tokens, max_sentences): if len(batch) == 0: return 0 if len(batch) == max_sentences: return 1 if num_tokens > max_tokens: return 1 return 0 def batch_by_size( indices, num_tokens_fn, max_tokens=None, max_sentences=None, required_batch_size_multiple=1, distributed=False ): """ Yield mini-batches of indices bucketed by size. Batches may contain sequences of different lengths. Args: indices (List[int]): ordered list of dataset indices num_tokens_fn (callable): function that returns the number of tokens at a given index max_tokens (int, optional): max number of tokens in each batch (default: None). max_sentences (int, optional): max number of sentences in each batch (default: None). required_batch_size_multiple (int, optional): require batch size to be a multiple of N (default: 1). """ max_tokens = max_tokens if max_tokens is not None else sys.maxsize max_sentences = max_sentences if max_sentences is not None else sys.maxsize bsz_mult = required_batch_size_multiple if isinstance(indices, types.GeneratorType): indices = np.fromiter(indices, dtype=np.int64, count=-1) sample_len = 0 sample_lens = [] batch = [] batches = [] for i in range(len(indices)): idx = indices[i] num_tokens = num_tokens_fn(idx) sample_lens.append(num_tokens) sample_len = max(sample_len, num_tokens) assert sample_len <= max_tokens, ( "sentence at index {} of size {} exceeds max_tokens " "limit of {}!".format(idx, sample_len, max_tokens) ) num_tokens = (len(batch) + 1) * sample_len if _is_batch_full(batch, num_tokens, max_tokens, max_sentences): mod_len = max( bsz_mult * (len(batch) // bsz_mult), len(batch) % bsz_mult, ) batches.append(batch[:mod_len]) batch = batch[mod_len:] sample_lens = sample_lens[mod_len:] sample_len = max(sample_lens) if len(sample_lens) > 0 else 0 batch.append(idx) if len(batch) > 0: batches.append(batch) return batches def unpack_dict_to_list(samples): samples_ = [] bsz = samples.get('outputs').size(0) for i in range(bsz): res = {} for k, v in samples.items(): try: res[k] = v[i] except: pass samples_.append(res) return samples_ def remove_padding(x, padding_idx=0): if x is None: return None assert len(x.shape) in [1, 2] if len(x.shape) == 2: # [T, H] return x[np.abs(x).sum(-1) != padding_idx] elif len(x.shape) == 1: # [T] return x[x != padding_idx] class Timer: timer_map = {} def __init__(self, name, enable=False): if name not in Timer.timer_map: Timer.timer_map[name] = 0 self.name = name self.enable = enable def __enter__(self): if self.enable: if torch.cuda.is_available(): torch.cuda.synchronize() self.t = time.time() def __exit__(self, exc_type, exc_val, exc_tb): if self.enable: if torch.cuda.is_available(): torch.cuda.synchronize() Timer.timer_map[self.name] += time.time() - self.t if self.enable: print(f'[Timer] {self.name}: {Timer.timer_map[self.name]}') def print_arch(model, model_name='model'): print(f"| {model_name} Arch: ", model) num_params(model, model_name=model_name) def num_params(model, print_out=True, model_name="model"): parameters = filter(lambda p: p.requires_grad, model.parameters()) parameters = sum([np.prod(p.size()) for p in parameters]) / 1_000_000 if print_out: print(f'| {model_name} Trainable Parameters: %.3fM' % parameters) return parameters def get_encoding(file): with open(file, 'rb') as f: encoding = chardet.detect(f.read())['encoding'] if encoding == 'GB2312': encoding = 'GB18030' return encoding