import torch import numpy as np import torch.nn.functional as F def costume_collate(data, max_len=None, mask_compensation=False): """Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create Args: data: len(batch_size) list of tuples (X, mask). - X: torch tensor of shape (seq_length, feat_dim); variable seq_length. - mask: boolean torch tensor of shape (seq_length, feat_dim); variable seq_length. max_len: global fixed sequence length. Used for architectures requiring fixed length input, where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s Returns: X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input) targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output) target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor 0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 ignore (padding) """ batch_size = len(data) features, masks = zip(*data) # Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension) lengths = [ X.shape[0] for X in features ] # original sequence length for each time series if max_len is None: max_len = max(lengths) X = torch.zeros( batch_size, max_len, features[0].shape[-1] ) # (batch_size, padded_length, feat_dim) target_masks = torch.zeros_like( X, dtype=torch.bool ) # (batch_size, padded_length, feat_dim) masks related to objective for i in range(batch_size): end = min(lengths[i], max_len) X[i, :end, :] = features[i][:end, :] target_masks[i, :end, :] = masks[i][:end, :] targets = X.clone() X = X * target_masks # mask input if mask_compensation: X = compensate_masking(X, target_masks) padding_masks = padding_mask( torch.tensor(lengths, dtype=torch.int16), max_len=max_len ) # (batch_size, padded_length) boolean tensor, "1" means keep # target_masks = ~target_masks # inverse logic: 0 now means ignore, 1 means predict return X, targets, target_masks, padding_masks def compensate_masking(X, mask): """ Compensate feature vectors after masking values, in a way that the matrix product W @ X would not be affected on average. If p is the proportion of unmasked (active) elements, X' = X / p = X * feat_dim/num_active Args: X: (batch_size, seq_length, feat_dim) torch tensor mask: (batch_size, seq_length, feat_dim) torch tensor: 0s means mask and predict, 1s: unaffected (active) input Returns: (batch_size, seq_length, feat_dim) compensated features """ # number of unmasked elements of feature vector for each time step num_active = torch.sum(mask, dim=-1).unsqueeze(-1) # (batch_size, seq_length, 1) # to avoid division by 0, set the minimum to 1 num_active = torch.max( num_active, torch.ones(num_active.shape, dtype=torch.int16) ) # (batch_size, seq_length, 1) return X.shape[-1] * X / num_active def padding_mask(lengths, max_len=None): """ Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths, where 1 means keep element at this position (time step) """ batch_size = lengths.numel() max_len = ( max_len or lengths.max_val() ) # trick works because of overloading of 'or' operator for non-boolean types return ( torch.arange(0, max_len, device=lengths.device) .type_as(lengths) .repeat(batch_size, 1) .lt(lengths.unsqueeze(1)) ) def noise_mask( X, masking_ratio, lm=3, mode="separate", distribution="geometric", exclude_feats=None, ): """ Creates a random boolean mask of the same shape as X, with 0s at places where a feature should be masked. Args: X: (seq_length, feat_dim) numpy array of features corresponding to a single sample masking_ratio: proportion of seq_length to be masked. At each time step, will also be the proportion of feat_dim that will be masked on average lm: average length of masking subsequences (streaks of 0s). Used only when `distribution` is 'geometric'. mode: whether each variable should be masked separately ('separate'), or all variables at a certain positions should be masked concurrently ('concurrent') distribution: whether each mask sequence element is sampled independently at random, or whether sampling follows a markov chain (and thus is stateful), resulting in geometric distributions of masked squences of a desired mean length `lm` exclude_feats: iterable of indices corresponding to features to be excluded from masking (i.e. to remain all 1s) Returns: boolean numpy array with the same shape as X, with 0s at places where a feature should be masked """ if exclude_feats is not None: exclude_feats = set(exclude_feats) if distribution == "geometric": # stateful (Markov chain) if mode == "separate": # each variable (feature) is independent mask = np.ones(X.shape, dtype=bool) for m in range(X.shape[1]): # feature dimension if exclude_feats is None or m not in exclude_feats: mask[:, m] = geom_noise_mask_single( X.shape[0], lm, masking_ratio ) # time dimension else: # replicate across feature dimension (mask all variables at the same positions concurrently) mask = np.tile( np.expand_dims( geom_noise_mask_single(X.shape[0], lm, masking_ratio), 1 ), X.shape[1], ) else: # each position is independent Bernoulli with p = 1 - masking_ratio if mode == "separate": mask = np.random.choice( np.array([True, False]), size=X.shape, replace=True, p=(1 - masking_ratio, masking_ratio), ) else: mask = np.tile( np.random.choice( np.array([True, False]), size=(X.shape[0], 1), replace=True, p=(1 - masking_ratio, masking_ratio), ), X.shape[1], ) return mask def geom_noise_mask_single(L, lm, masking_ratio): """ Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` proportion of the sequence L. The length of masking subsequences and intervals follow a geometric distribution. Args: L: length of mask and sequence to be masked lm: average length of masking subsequences (streaks of 0s) masking_ratio: proportion of L to be masked Returns: (L,) boolean numpy array intended to mask ('drop') with 0s a sequence of length L """ keep_mask = np.ones(L, dtype=bool) p_m = ( 1 / lm ) # probability of each masking sequence stopping. parameter of geometric distribution. p_u = p_m * masking_ratio / (1 - masking_ratio) # probability of each unmasked sequence stopping. parameter of geometric distribution. p = [p_m, p_u] # Start in state 0 with masking_ratio probability state = int( np.random.rand() > masking_ratio ) # state 0 means masking, 1 means not masking for i in range(L): keep_mask[i] = ( state # here it happens that state and masking value corresponding to state are identical ) if np.random.rand() < p[state]: state = 1 - state return keep_mask def uniform_noise_mask_single(L, lm, masking_ratio): """ Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` proportion of the sequence L. The length of masking subsequences and intervals follow a uniform distribution. Args: L: length of mask and sequence to be masked lm: average length of masking subsequences (streaks of 0s) masking_ratio: proportion of L to be masked """ keep_mask = np.ones(L, dtype=bool) # Start in state 0 with masking_ratio probability state = int( np.random.rand() > masking_ratio ) # state 0 means masking, 1 means not masking for i in range(L): keep_mask[i] = ( state # here it happens that state and masking value corresponding to state are identical ) if np.random.rand() < 1 / lm: state = 1 - state return keep_mask