Spaces:
Sleeping
Sleeping
import os | |
import numpy as np | |
import pandas as pd | |
import torch | |
def collate_fn(data, max_len=None): | |
"""Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create | |
Args: | |
data: len(batch_size) list of tuples (X, y). | |
- X: torch tensor of shape (seq_length, feat_dim); variable seq_length. | |
- y: torch tensor of shape (num_labels,) : class indices or numerical targets | |
(for classification or regression, respectively). num_labels > 1 for multi-task models | |
max_len: global fixed sequence length. Used for architectures requiring fixed length input, | |
where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s | |
Returns: | |
X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input) | |
targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output) | |
target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor | |
0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values | |
padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 means padding | |
""" | |
batch_size = len(data) | |
features, labels = zip(*data) | |
# Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension) | |
lengths = [X.shape[0] for X in features] # original sequence length for each time series | |
if max_len is None: | |
max_len = max(lengths) | |
X = torch.zeros(batch_size, max_len, features[0].shape[-1]) # (batch_size, padded_length, feat_dim) | |
for i in range(batch_size): | |
end = min(lengths[i], max_len) | |
X[i, :end, :] = features[i][:end, :] | |
targets = torch.stack(labels, dim=0) # (batch_size, num_labels) | |
padding_masks = padding_mask(torch.tensor(lengths, dtype=torch.int16), | |
max_len=max_len) # (batch_size, padded_length) boolean tensor, "1" means keep | |
return X, targets, padding_masks | |
def padding_mask(lengths, max_len=None): | |
""" | |
Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths, | |
where 1 means keep element at this position (time step) | |
""" | |
batch_size = lengths.numel() | |
max_len = max_len or lengths.max_val() # trick works because of overloading of 'or' operator for non-boolean types | |
return (torch.arange(0, max_len, device=lengths.device) | |
.type_as(lengths) | |
.repeat(batch_size, 1) | |
.lt(lengths.unsqueeze(1))) | |
class Normalizer(object): | |
""" | |
Normalizes dataframe across ALL contained rows (time steps). Different from per-sample normalization. | |
""" | |
def __init__(self, norm_type='standardization', mean=None, std=None, min_val=None, max_val=None): | |
""" | |
Args: | |
norm_type: choose from: | |
"standardization", "minmax": normalizes dataframe across ALL contained rows (time steps) | |
"per_sample_std", "per_sample_minmax": normalizes each sample separately (i.e. across only its own rows) | |
mean, std, min_val, max_val: optional (num_feat,) Series of pre-computed values | |
""" | |
self.norm_type = norm_type | |
self.mean = mean | |
self.std = std | |
self.min_val = min_val | |
self.max_val = max_val | |
def normalize(self, df): | |
""" | |
Args: | |
df: input dataframe | |
Returns: | |
df: normalized dataframe | |
""" | |
if self.norm_type == "standardization": | |
if self.mean is None: | |
self.mean = df.mean() | |
self.std = df.std() | |
return (df - self.mean) / (self.std + np.finfo(float).eps) | |
elif self.norm_type == "minmax": | |
if self.max_val is None: | |
self.max_val = df.max() | |
self.min_val = df.min() | |
return (df - self.min_val) / (self.max_val - self.min_val + np.finfo(float).eps) | |
elif self.norm_type == "per_sample_std": | |
grouped = df.groupby(by=df.index) | |
return (df - grouped.transform('mean')) / grouped.transform('std') | |
elif self.norm_type == "per_sample_minmax": | |
grouped = df.groupby(by=df.index) | |
min_vals = grouped.transform('min') | |
return (df - min_vals) / (grouped.transform('max') - min_vals + np.finfo(float).eps) | |
else: | |
raise (NameError(f'Normalize method "{self.norm_type}" not implemented')) | |
def interpolate_missing(y): | |
""" | |
Replaces NaN values in pd.Series `y` using linear interpolation | |
""" | |
if y.isna().any(): | |
y = y.interpolate(method='linear', limit_direction='both') | |
return y | |
def subsample(y, limit=256, factor=2): | |
""" | |
If a given Series is longer than `limit`, returns subsampled sequence by the specified integer factor | |
""" | |
if len(y) > limit: | |
return y[::factor].reset_index(drop=True) | |
return y | |