Spaces:
Sleeping
Sleeping
import torch | |
import numpy as np | |
import torch.nn.functional as F | |
def costume_collate(data, max_len=None, mask_compensation=False): | |
"""Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create | |
Args: | |
data: len(batch_size) list of tuples (X, mask). | |
- X: torch tensor of shape (seq_length, feat_dim); variable seq_length. | |
- mask: boolean torch tensor of shape (seq_length, feat_dim); variable seq_length. | |
max_len: global fixed sequence length. Used for architectures requiring fixed length input, | |
where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s | |
Returns: | |
X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input) | |
targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output) | |
target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor | |
0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values | |
padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 ignore (padding) | |
""" | |
batch_size = len(data) | |
features, masks = zip(*data) | |
# Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension) | |
lengths = [ | |
X.shape[0] for X in features | |
] # original sequence length for each time series | |
if max_len is None: | |
max_len = max(lengths) | |
X = torch.zeros( | |
batch_size, max_len, features[0].shape[-1] | |
) # (batch_size, padded_length, feat_dim) | |
target_masks = torch.zeros_like( | |
X, dtype=torch.bool | |
) # (batch_size, padded_length, feat_dim) masks related to objective | |
for i in range(batch_size): | |
end = min(lengths[i], max_len) | |
X[i, :end, :] = features[i][:end, :] | |
target_masks[i, :end, :] = masks[i][:end, :] | |
targets = X.clone() | |
X = X * target_masks # mask input | |
if mask_compensation: | |
X = compensate_masking(X, target_masks) | |
padding_masks = padding_mask( | |
torch.tensor(lengths, dtype=torch.int16), max_len=max_len | |
) # (batch_size, padded_length) boolean tensor, "1" means keep | |
# target_masks = ~target_masks # inverse logic: 0 now means ignore, 1 means predict | |
return X, targets, target_masks, padding_masks | |
def compensate_masking(X, mask): | |
""" | |
Compensate feature vectors after masking values, in a way that the matrix product W @ X would not be affected on average. | |
If p is the proportion of unmasked (active) elements, X' = X / p = X * feat_dim/num_active | |
Args: | |
X: (batch_size, seq_length, feat_dim) torch tensor | |
mask: (batch_size, seq_length, feat_dim) torch tensor: 0s means mask and predict, 1s: unaffected (active) input | |
Returns: | |
(batch_size, seq_length, feat_dim) compensated features | |
""" | |
# number of unmasked elements of feature vector for each time step | |
num_active = torch.sum(mask, dim=-1).unsqueeze(-1) # (batch_size, seq_length, 1) | |
# to avoid division by 0, set the minimum to 1 | |
num_active = torch.max( | |
num_active, torch.ones(num_active.shape, dtype=torch.int16) | |
) # (batch_size, seq_length, 1) | |
return X.shape[-1] * X / num_active | |
def padding_mask(lengths, max_len=None): | |
""" | |
Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths, | |
where 1 means keep element at this position (time step) | |
""" | |
batch_size = lengths.numel() | |
max_len = ( | |
max_len or lengths.max_val() | |
) # trick works because of overloading of 'or' operator for non-boolean types | |
return ( | |
torch.arange(0, max_len, device=lengths.device) | |
.type_as(lengths) | |
.repeat(batch_size, 1) | |
.lt(lengths.unsqueeze(1)) | |
) | |
def noise_mask( | |
X, | |
masking_ratio, | |
lm=3, | |
mode="separate", | |
distribution="geometric", | |
exclude_feats=None, | |
): | |
""" | |
Creates a random boolean mask of the same shape as X, with 0s at places where a feature should be masked. | |
Args: | |
X: (seq_length, feat_dim) numpy array of features corresponding to a single sample | |
masking_ratio: proportion of seq_length to be masked. At each time step, will also be the proportion of | |
feat_dim that will be masked on average | |
lm: average length of masking subsequences (streaks of 0s). Used only when `distribution` is 'geometric'. | |
mode: whether each variable should be masked separately ('separate'), or all variables at a certain positions | |
should be masked concurrently ('concurrent') | |
distribution: whether each mask sequence element is sampled independently at random, or whether | |
sampling follows a markov chain (and thus is stateful), resulting in geometric distributions of | |
masked squences of a desired mean length `lm` | |
exclude_feats: iterable of indices corresponding to features to be excluded from masking (i.e. to remain all 1s) | |
Returns: | |
boolean numpy array with the same shape as X, with 0s at places where a feature should be masked | |
""" | |
if exclude_feats is not None: | |
exclude_feats = set(exclude_feats) | |
if distribution == "geometric": # stateful (Markov chain) | |
if mode == "separate": # each variable (feature) is independent | |
mask = np.ones(X.shape, dtype=bool) | |
for m in range(X.shape[1]): # feature dimension | |
if exclude_feats is None or m not in exclude_feats: | |
mask[:, m] = geom_noise_mask_single( | |
X.shape[0], lm, masking_ratio | |
) # time dimension | |
else: # replicate across feature dimension (mask all variables at the same positions concurrently) | |
mask = np.tile( | |
np.expand_dims( | |
geom_noise_mask_single(X.shape[0], lm, masking_ratio), 1 | |
), | |
X.shape[1], | |
) | |
else: # each position is independent Bernoulli with p = 1 - masking_ratio | |
if mode == "separate": | |
mask = np.random.choice( | |
np.array([True, False]), | |
size=X.shape, | |
replace=True, | |
p=(1 - masking_ratio, masking_ratio), | |
) | |
else: | |
mask = np.tile( | |
np.random.choice( | |
np.array([True, False]), | |
size=(X.shape[0], 1), | |
replace=True, | |
p=(1 - masking_ratio, masking_ratio), | |
), | |
X.shape[1], | |
) | |
return mask | |
def geom_noise_mask_single(L, lm, masking_ratio): | |
""" | |
Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` | |
proportion of the sequence L. The length of masking subsequences and intervals follow a geometric distribution. | |
Args: | |
L: length of mask and sequence to be masked | |
lm: average length of masking subsequences (streaks of 0s) | |
masking_ratio: proportion of L to be masked | |
Returns: | |
(L,) boolean numpy array intended to mask ('drop') with 0s a sequence of length L | |
""" | |
keep_mask = np.ones(L, dtype=bool) | |
p_m = ( | |
1 / lm | |
) # probability of each masking sequence stopping. parameter of geometric distribution. | |
p_u = p_m * masking_ratio / (1 - masking_ratio) | |
# probability of each unmasked sequence stopping. parameter of geometric distribution. | |
p = [p_m, p_u] | |
# Start in state 0 with masking_ratio probability | |
state = int( | |
np.random.rand() > masking_ratio | |
) # state 0 means masking, 1 means not masking | |
for i in range(L): | |
keep_mask[i] = ( | |
state # here it happens that state and masking value corresponding to state are identical | |
) | |
if np.random.rand() < p[state]: | |
state = 1 - state | |
return keep_mask | |
def uniform_noise_mask_single(L, lm, masking_ratio): | |
""" | |
Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio` | |
proportion of the sequence L. The length of masking subsequences and intervals follow a uniform distribution. | |
Args: | |
L: length of mask and sequence to be masked | |
lm: average length of masking subsequences (streaks of 0s) | |
masking_ratio: proportion of L to be masked | |
""" | |
keep_mask = np.ones(L, dtype=bool) | |
# Start in state 0 with masking_ratio probability | |
state = int( | |
np.random.rand() > masking_ratio | |
) # state 0 means masking, 1 means not masking | |
for i in range(L): | |
keep_mask[i] = ( | |
state # here it happens that state and masking value corresponding to state are identical | |
) | |
if np.random.rand() < 1 / lm: | |
state = 1 - state | |
return keep_mask | |