TSEditor / utils /masking_utils.py
PeterYu's picture
update
2875fe6
import torch
import numpy as np
import torch.nn.functional as F
def costume_collate(data, max_len=None, mask_compensation=False):
"""Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create
Args:
data: len(batch_size) list of tuples (X, mask).
- X: torch tensor of shape (seq_length, feat_dim); variable seq_length.
- mask: boolean torch tensor of shape (seq_length, feat_dim); variable seq_length.
max_len: global fixed sequence length. Used for architectures requiring fixed length input,
where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s
Returns:
X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input)
targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output)
target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor
0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values
padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 ignore (padding)
"""
batch_size = len(data)
features, masks = zip(*data)
# Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension)
lengths = [
X.shape[0] for X in features
] # original sequence length for each time series
if max_len is None:
max_len = max(lengths)
X = torch.zeros(
batch_size, max_len, features[0].shape[-1]
) # (batch_size, padded_length, feat_dim)
target_masks = torch.zeros_like(
X, dtype=torch.bool
) # (batch_size, padded_length, feat_dim) masks related to objective
for i in range(batch_size):
end = min(lengths[i], max_len)
X[i, :end, :] = features[i][:end, :]
target_masks[i, :end, :] = masks[i][:end, :]
targets = X.clone()
X = X * target_masks # mask input
if mask_compensation:
X = compensate_masking(X, target_masks)
padding_masks = padding_mask(
torch.tensor(lengths, dtype=torch.int16), max_len=max_len
) # (batch_size, padded_length) boolean tensor, "1" means keep
# target_masks = ~target_masks # inverse logic: 0 now means ignore, 1 means predict
return X, targets, target_masks, padding_masks
def compensate_masking(X, mask):
"""
Compensate feature vectors after masking values, in a way that the matrix product W @ X would not be affected on average.
If p is the proportion of unmasked (active) elements, X' = X / p = X * feat_dim/num_active
Args:
X: (batch_size, seq_length, feat_dim) torch tensor
mask: (batch_size, seq_length, feat_dim) torch tensor: 0s means mask and predict, 1s: unaffected (active) input
Returns:
(batch_size, seq_length, feat_dim) compensated features
"""
# number of unmasked elements of feature vector for each time step
num_active = torch.sum(mask, dim=-1).unsqueeze(-1) # (batch_size, seq_length, 1)
# to avoid division by 0, set the minimum to 1
num_active = torch.max(
num_active, torch.ones(num_active.shape, dtype=torch.int16)
) # (batch_size, seq_length, 1)
return X.shape[-1] * X / num_active
def padding_mask(lengths, max_len=None):
"""
Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths,
where 1 means keep element at this position (time step)
"""
batch_size = lengths.numel()
max_len = (
max_len or lengths.max_val()
) # trick works because of overloading of 'or' operator for non-boolean types
return (
torch.arange(0, max_len, device=lengths.device)
.type_as(lengths)
.repeat(batch_size, 1)
.lt(lengths.unsqueeze(1))
)
def noise_mask(
X,
masking_ratio,
lm=3,
mode="separate",
distribution="geometric",
exclude_feats=None,
):
"""
Creates a random boolean mask of the same shape as X, with 0s at places where a feature should be masked.
Args:
X: (seq_length, feat_dim) numpy array of features corresponding to a single sample
masking_ratio: proportion of seq_length to be masked. At each time step, will also be the proportion of
feat_dim that will be masked on average
lm: average length of masking subsequences (streaks of 0s). Used only when `distribution` is 'geometric'.
mode: whether each variable should be masked separately ('separate'), or all variables at a certain positions
should be masked concurrently ('concurrent')
distribution: whether each mask sequence element is sampled independently at random, or whether
sampling follows a markov chain (and thus is stateful), resulting in geometric distributions of
masked squences of a desired mean length `lm`
exclude_feats: iterable of indices corresponding to features to be excluded from masking (i.e. to remain all 1s)
Returns:
boolean numpy array with the same shape as X, with 0s at places where a feature should be masked
"""
if exclude_feats is not None:
exclude_feats = set(exclude_feats)
if distribution == "geometric": # stateful (Markov chain)
if mode == "separate": # each variable (feature) is independent
mask = np.ones(X.shape, dtype=bool)
for m in range(X.shape[1]): # feature dimension
if exclude_feats is None or m not in exclude_feats:
mask[:, m] = geom_noise_mask_single(
X.shape[0], lm, masking_ratio
) # time dimension
else: # replicate across feature dimension (mask all variables at the same positions concurrently)
mask = np.tile(
np.expand_dims(
geom_noise_mask_single(X.shape[0], lm, masking_ratio), 1
),
X.shape[1],
)
else: # each position is independent Bernoulli with p = 1 - masking_ratio
if mode == "separate":
mask = np.random.choice(
np.array([True, False]),
size=X.shape,
replace=True,
p=(1 - masking_ratio, masking_ratio),
)
else:
mask = np.tile(
np.random.choice(
np.array([True, False]),
size=(X.shape[0], 1),
replace=True,
p=(1 - masking_ratio, masking_ratio),
),
X.shape[1],
)
return mask
def geom_noise_mask_single(L, lm, masking_ratio):
"""
Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio`
proportion of the sequence L. The length of masking subsequences and intervals follow a geometric distribution.
Args:
L: length of mask and sequence to be masked
lm: average length of masking subsequences (streaks of 0s)
masking_ratio: proportion of L to be masked
Returns:
(L,) boolean numpy array intended to mask ('drop') with 0s a sequence of length L
"""
keep_mask = np.ones(L, dtype=bool)
p_m = (
1 / lm
) # probability of each masking sequence stopping. parameter of geometric distribution.
p_u = p_m * masking_ratio / (1 - masking_ratio)
# probability of each unmasked sequence stopping. parameter of geometric distribution.
p = [p_m, p_u]
# Start in state 0 with masking_ratio probability
state = int(
np.random.rand() > masking_ratio
) # state 0 means masking, 1 means not masking
for i in range(L):
keep_mask[i] = (
state # here it happens that state and masking value corresponding to state are identical
)
if np.random.rand() < p[state]:
state = 1 - state
return keep_mask
def uniform_noise_mask_single(L, lm, masking_ratio):
"""
Randomly create a boolean mask of length `L`, consisting of subsequences of average length lm, masking with 0s a `masking_ratio`
proportion of the sequence L. The length of masking subsequences and intervals follow a uniform distribution.
Args:
L: length of mask and sequence to be masked
lm: average length of masking subsequences (streaks of 0s)
masking_ratio: proportion of L to be masked
"""
keep_mask = np.ones(L, dtype=bool)
# Start in state 0 with masking_ratio probability
state = int(
np.random.rand() > masking_ratio
) # state 0 means masking, 1 means not masking
for i in range(L):
keep_mask[i] = (
state # here it happens that state and masking value corresponding to state are identical
)
if np.random.rand() < 1 / lm:
state = 1 - state
return keep_mask