import os import torch import numpy as np import pandas as pd from scipy import io from sklearn.preprocessing import MinMaxScaler from torch.utils.data import Dataset from models.model_utils import normalize_to_neg_one_to_one, unnormalize_to_zero_to_one from utils.masking_utils import noise_mask class CustomDataset(Dataset): def __init__( self, name, data_root, window=64, proportion=0.8, save2npy=True, neg_one_to_one=True, seed=123, period="train", output_dir="./OUTPUT", predict_length=None, missing_ratio=None, style="separate", distribution="geometric", mean_mask_length=3, ): super(CustomDataset, self).__init__() assert period in ["train", "test"], "period must be train or test." if period == "train": assert not (predict_length is not None or missing_ratio is not None), "" self.period = period self.name = name self.pred_len = predict_length self.missing_ratio = missing_ratio self.style = style self.distribution = distribution self.mean_mask_length = mean_mask_length self.rawdata, self.scaler = self.read_data(data_root, self.name) self.dir = os.path.join(output_dir, "samples") os.makedirs(self.dir, exist_ok=True) self.window, self.period = window, period self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1] self.sample_num_total = max(self.len - self.window + 1, 0) self.save2npy = save2npy self.auto_norm = neg_one_to_one self.data = self.__normalize(self.rawdata) train, inference = self.getsamples(self.data, proportion, seed) self.samples = train if period == "train" else inference if period == "test": if missing_ratio is not None: self.masking = self.mask_data(seed) elif predict_length is not None: masks = np.ones(self.samples.shape) masks[:, -predict_length:, :] = 0 self.masking = masks.astype(bool) else: raise NotImplementedError() self.sample_num = self.samples.shape[0] print(f"Dataset load from {data_root} with shape {self.samples.shape}") def getsamples(self, data, proportion, seed): x = np.zeros((self.sample_num_total, self.window, self.var_num)) for i in range(self.sample_num_total): start = i end = i + self.window x[i, :, :] = data[start:end, :] train_data, test_data = self.divide(x, proportion, seed) if self.save2npy: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_ground_truth_{self.window}_test.npy" ), self.unnormalize(test_data), ) np.save( os.path.join( self.dir, f"{self.name}_ground_truth_{self.window}_train.npy" ), self.unnormalize(train_data), ) if self.auto_norm: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" ), unnormalize_to_zero_to_one(test_data), ) np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" ), unnormalize_to_zero_to_one(train_data), ) else: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" ), test_data, ) np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" ), train_data, ) return train_data, test_data def normalize(self, sq): d = sq.reshape(-1, self.var_num) d = self.scaler.transform(d) if self.auto_norm: d = normalize_to_neg_one_to_one(d) return d.reshape(-1, self.window, self.var_num) def unnormalize(self, sq): d = self.__unnormalize(sq.reshape(-1, self.var_num)) return d.reshape(-1, self.window, self.var_num) def __normalize(self, rawdata): data = self.scaler.transform(rawdata) if self.auto_norm: data = normalize_to_neg_one_to_one(data) return data def __unnormalize(self, data): if self.auto_norm: data = unnormalize_to_zero_to_one(data) x = data return self.scaler.inverse_transform(x) @staticmethod def divide(data, ratio, seed=2023): size = data.shape[0] # Store the state of the RNG to restore later. st0 = np.random.get_state() np.random.seed(seed) regular_train_num = int(np.ceil(size * ratio)) id_rdm = np.random.permutation(size) # id_rdm = np.arange(size) regular_train_id = id_rdm[:regular_train_num] irregular_train_id = id_rdm[regular_train_num:] regular_data = data[regular_train_id, :] irregular_data = data[irregular_train_id, :] # Restore RNG. np.random.set_state(st0) return regular_data, irregular_data @staticmethod def read_data(filepath, name=""): """Reads a single .csv""" df = pd.read_csv(filepath, header=0) if name == "etth": df.drop(df.columns[0], axis=1, inplace=True) data = df.values scaler = MinMaxScaler() scaler = scaler.fit(data) return data, scaler def mask_data(self, seed=2023): masks = np.ones_like(self.samples) # Store the state of the RNG to restore later. st0 = np.random.get_state() np.random.seed(seed) for idx in range(self.samples.shape[0]): x = self.samples[idx, :, :] # (seq_length, feat_dim) array mask = noise_mask( x, self.missing_ratio, self.mean_mask_length, self.style, self.distribution, ) # (seq_length, feat_dim) boolean array masks[idx, :, :] = mask if self.save2npy: np.save( os.path.join(self.dir, f"{self.name}_masking_{self.window}.npy"), masks ) # Restore RNG. np.random.set_state(st0) return masks.astype(bool) def __getitem__(self, ind): if self.period == "test": x = self.samples[ind, :, :] # (seq_length, feat_dim) array m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array return torch.from_numpy(x).float(), torch.from_numpy(m) x = self.samples[ind, :, :] # (seq_length, feat_dim) array return torch.from_numpy(x).float() def __len__(self): return self.sample_num class RevenueDataset(CustomDataset): def __init__( self, name, data_root, window=64, proportion=0.8, save2npy=True, neg_one_to_one=True, seed=123, period="train", output_dir="./OUTPUT", predict_length=None, missing_ratio=None, style="separate", distribution="geometric", mean_mask_length=3, ): super(CustomDataset, self).__init__() assert period in ["train", "test"], "period must be train or test." if period == "train": assert not (predict_length is not None or missing_ratio is not None), "" self.period = period self.name = name self.pred_len = predict_length self.missing_ratio = missing_ratio self.style = style self.distribution = distribution self.mean_mask_length = mean_mask_length self.dir = os.path.join(output_dir, "samples") os.makedirs(self.dir, exist_ok=True) self.window, self.period = window, period # self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1] self.rawdata, self.scaler = self.read_data(data_root, self.name) self.len = len(self.rawdata) // self.window self.var_num = 3 self.sample_num_total = self.len # self.sample_num_total = max(self.len - self.window + 1, 0) self.save2npy = save2npy self.auto_norm = neg_one_to_one self.data = self.__normalize(self.rawdata) train, inference = self.getsamples(self.data, proportion, seed) self.samples = train if period == "train" else inference if period == "test": if missing_ratio is not None: self.masking = self.mask_data(seed) elif predict_length is not None: masks = np.ones(self.samples.shape) masks[:, -predict_length:, :] = 0 self.masking = masks.astype(bool) else: raise NotImplementedError() self.sample_num = self.samples.shape[0] print(f"Dataset load from {data_root} with shape {self.samples.shape}") # @staticmethod def read_data(self, filepath, name=""): """Reads a single .csv""" df = pd.read_csv(filepath) min_max_scale = lambda series: (series - series.min()) / ( series.max() - series.min() ) mean_std_scale = lambda series: (series - series.mean()) / series.std() moving_average = lambda series: series.rolling(window=7, min_periods=1).mean() for variable in ["revenue", "download", "au"]: df[variable] = df.groupby("app_id")[variable].transform(min_max_scale) # data = df.groupby("app_id").first(min_count=self.window).values # get the first window days of each app after sorting by date data = ( df.groupby("app_id").head(self.window)[["download", "revenue", "au"]].values ) # print(data.shape, self.window) # print(self.window * len(df["app_id"].unique())) scaler = MinMaxScaler() scaler = scaler.fit(data) return data, scaler def __normalize(self, rawdata): data = self.scaler.transform(rawdata) if self.auto_norm: data = normalize_to_neg_one_to_one(data) return data def __unnormalize(self, data): if self.auto_norm: data = unnormalize_to_zero_to_one(data) x = data return self.scaler.inverse_transform(x) class ControlRevenueDataset(RevenueDataset): def getsamples(self, data, proportion, seed): x = np.zeros((self.sample_num_total, self.window, self.var_num)) for i in range(self.sample_num_total): start = i end = i + self.window x[i, :, :] = data[start:end, :] train_data, test_data = self.divide(x, proportion, seed) # print("Origin split, train ", train_data.shape, "; test", test_data.shape) # print(train_data.max(), train_data.min()) import random # data agumentation # plt five times aug data[0] with 3 channel # import matplotlib.pyplot as plt # sub = plt.subplot(111) # sub.plot(train_data[0, :, 0]) # sub.plot(train_data[0, :, 1]) # sub.plot(train_data[0, :, 2]) # plt.show() aug_data = [] # for delta in np.linspace(-.3, 0.5, 4): for delta in np.linspace(-0.3, 0.3, 4): print(delta) random.seed(2023) tmp = train_data.copy() tmp[:, :, 0] += np.random.normal(delta, 2, tmp.shape[1]) / 10 tmp[:, :, 1] += np.random.normal(delta, 0.15, tmp.shape[1]) / 10 tmp[:, :, 2] += np.random.normal(delta, 0.1, tmp.shape[1]) / 10 for c in range(3): # min max resacele tmp[:, :, c] = ( (tmp[:, :, c] - tmp[:, :, c].min()) / (tmp[:, :, c].max() - tmp[:, :, c].min()) - 0.5 ) * 2 aug_data.append(tmp) # sub = plt.subplot(111) # sub.plot(tmp[0, :, 0]) # sub.plot(tmp[0, :, 1]) # sub.plot(tmp[0, :, 2]) # plt.show() train_data = np.concatenate([train_data] + aug_data, axis=0).clip(-1, 1) if self.save2npy: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_ground_truth_{self.window}_test.npy" ), self.unnormalize(test_data), ) np.save( os.path.join( self.dir, f"{self.name}_ground_truth_{self.window}_train.npy" ), self.unnormalize(train_data), ) if self.auto_norm: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" ), unnormalize_to_zero_to_one(test_data), ) np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" ), unnormalize_to_zero_to_one(train_data), ) else: if 1 - proportion > 0: np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" ), test_data, ) np.save( os.path.join( self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" ), train_data, ) # print("Split, train ", train_data.shape, "; test", test_data.shape) return train_data, test_data def __normalize(self, rawdata): data = self.scaler.transform(rawdata) if self.auto_norm: data = normalize_to_neg_one_to_one(data) return data def __unnormalize(self, data): if self.auto_norm: data = unnormalize_to_zero_to_one(data) x = data return self.scaler.inverse_transform(x) class fMRIDataset(CustomDataset): def __init__(self, proportion=1.0, **kwargs): super().__init__(proportion=proportion, **kwargs) @staticmethod def read_data(filepath, name=""): """Reads a single .csv""" data = io.loadmat(filepath + "/sim4.mat")["ts"] scaler = MinMaxScaler() scaler = scaler.fit(data) return data, scaler