Spaces:
Sleeping
Sleeping
import numpy as np | |
from tqdm import tqdm | |
def jitter(x, sigma=0.03): | |
# https://arxiv.org/pdf/1706.00527.pdf | |
return x + np.random.normal(loc=0., scale=sigma, size=x.shape) | |
def scaling(x, sigma=0.1): | |
# https://arxiv.org/pdf/1706.00527.pdf | |
factor = np.random.normal(loc=1., scale=sigma, size=(x.shape[0], x.shape[2])) | |
return np.multiply(x, factor[:, np.newaxis, :]) | |
def rotation(x): | |
x = np.array(x) | |
flip = np.random.choice([-1, 1], size=(x.shape[0], x.shape[2])) | |
rotate_axis = np.arange(x.shape[2]) | |
np.random.shuffle(rotate_axis) | |
return flip[:, np.newaxis, :] * x[:, :, rotate_axis] | |
def permutation(x, max_segments=5, seg_mode="equal"): | |
orig_steps = np.arange(x.shape[1]) | |
num_segs = np.random.randint(1, max_segments, size=(x.shape[0])) | |
ret = np.zeros_like(x) | |
for i, pat in enumerate(x): | |
if num_segs[i] > 1: | |
if seg_mode == "random": | |
split_points = np.random.choice(x.shape[1] - 2, num_segs[i] - 1, replace=False) | |
split_points.sort() | |
splits = np.split(orig_steps, split_points) | |
else: | |
splits = np.array_split(orig_steps, num_segs[i]) | |
warp = np.concatenate(np.random.permutation(splits)).ravel() | |
# ? Question: What is the point of making segments? | |
# for i in range(len(splits)): | |
# permute = np.random.permutation(splits[i]) | |
ret[i] = pat[warp] | |
else: | |
ret[i] = pat | |
return ret | |
def magnitude_warp(x, sigma=0.2, knot=4): | |
from scipy.interpolate import CubicSpline | |
orig_steps = np.arange(x.shape[1]) | |
random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot + 2, x.shape[2])) | |
warp_steps = (np.ones((x.shape[2], 1)) * (np.linspace(0, x.shape[1] - 1., num=knot + 2))).T | |
ret = np.zeros_like(x) | |
for i, pat in enumerate(x): | |
warper = np.array( | |
[CubicSpline(warp_steps[:, dim], random_warps[i, :, dim])(orig_steps) for dim in range(x.shape[2])]).T | |
ret[i] = pat * warper | |
return ret | |
def time_warp(x, sigma=0.2, knot=4): | |
from scipy.interpolate import CubicSpline | |
orig_steps = np.arange(x.shape[1]) | |
random_warps = np.random.normal(loc=1.0, scale=sigma, size=(x.shape[0], knot + 2, x.shape[2])) | |
warp_steps = (np.ones((x.shape[2], 1)) * (np.linspace(0, x.shape[1] - 1., num=knot + 2))).T | |
ret = np.zeros_like(x) | |
for i, pat in enumerate(x): | |
for dim in range(x.shape[2]): | |
time_warp = CubicSpline(warp_steps[:, dim], warp_steps[:, dim] * random_warps[i, :, dim])(orig_steps) | |
scale = (x.shape[1] - 1) / time_warp[-1] | |
ret[i, :, dim] = np.interp(orig_steps, np.clip(scale * time_warp, 0, x.shape[1] - 1), pat[:, dim]).T | |
return ret | |
def window_slice(x, reduce_ratio=0.9): | |
# https://halshs.archives-ouvertes.fr/halshs-01357973/document | |
target_len = np.ceil(reduce_ratio * x.shape[1]).astype(int) | |
if target_len >= x.shape[1]: | |
return x | |
starts = np.random.randint(low=0, high=x.shape[1] - target_len, size=(x.shape[0])).astype(int) | |
ends = (target_len + starts).astype(int) | |
ret = np.zeros_like(x) | |
for i, pat in enumerate(x): | |
for dim in range(x.shape[2]): | |
ret[i, :, dim] = np.interp(np.linspace(0, target_len, num=x.shape[1]), np.arange(target_len), | |
pat[starts[i]:ends[i], dim]).T | |
return ret | |
def window_warp(x, window_ratio=0.1, scales=[0.5, 2.]): | |
# https://halshs.archives-ouvertes.fr/halshs-01357973/document | |
warp_scales = np.random.choice(scales, x.shape[0]) | |
warp_size = np.ceil(window_ratio * x.shape[1]).astype(int) | |
window_steps = np.arange(warp_size) | |
window_starts = np.random.randint(low=1, high=x.shape[1] - warp_size - 1, size=(x.shape[0])).astype(int) | |
window_ends = (window_starts + warp_size).astype(int) | |
ret = np.zeros_like(x) | |
for i, pat in enumerate(x): | |
for dim in range(x.shape[2]): | |
start_seg = pat[:window_starts[i], dim] | |
window_seg = np.interp(np.linspace(0, warp_size - 1, num=int(warp_size * warp_scales[i])), window_steps, | |
pat[window_starts[i]:window_ends[i], dim]) | |
end_seg = pat[window_ends[i]:, dim] | |
warped = np.concatenate((start_seg, window_seg, end_seg)) | |
ret[i, :, dim] = np.interp(np.arange(x.shape[1]), np.linspace(0, x.shape[1] - 1., num=warped.size), | |
warped).T | |
return ret | |
def spawner(x, labels, sigma=0.05, verbose=0): | |
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6983028/ | |
# use verbose=-1 to turn off warnings | |
# use verbose=1 to print out figures | |
import utils.dtw as dtw | |
random_points = np.random.randint(low=1, high=x.shape[1] - 1, size=x.shape[0]) | |
window = np.ceil(x.shape[1] / 10.).astype(int) | |
orig_steps = np.arange(x.shape[1]) | |
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
ret = np.zeros_like(x) | |
# for i, pat in enumerate(tqdm(x)): | |
for i, pat in enumerate(x): | |
# guarentees that same one isnt selected | |
choices = np.delete(np.arange(x.shape[0]), i) | |
# remove ones of different classes | |
choices = np.where(l[choices] == l[i])[0] | |
if choices.size > 0: | |
random_sample = x[np.random.choice(choices)] | |
# SPAWNER splits the path into two randomly | |
path1 = dtw.dtw(pat[:random_points[i]], random_sample[:random_points[i]], dtw.RETURN_PATH, | |
slope_constraint="symmetric", window=window) | |
path2 = dtw.dtw(pat[random_points[i]:], random_sample[random_points[i]:], dtw.RETURN_PATH, | |
slope_constraint="symmetric", window=window) | |
combined = np.concatenate((np.vstack(path1), np.vstack(path2 + random_points[i])), axis=1) | |
if verbose: | |
# print(random_points[i]) | |
dtw_value, cost, DTW_map, path = dtw.dtw(pat, random_sample, return_flag=dtw.RETURN_ALL, | |
slope_constraint=slope_constraint, window=window) | |
dtw.draw_graph1d(cost, DTW_map, path, pat, random_sample) | |
dtw.draw_graph1d(cost, DTW_map, combined, pat, random_sample) | |
mean = np.mean([pat[combined[0]], random_sample[combined[1]]], axis=0) | |
for dim in range(x.shape[2]): | |
ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=mean.shape[0]), | |
mean[:, dim]).T | |
else: | |
# if verbose > -1: | |
# print("There is only one pattern of class {}, skipping pattern average".format(l[i])) | |
ret[i, :] = pat | |
return jitter(ret, sigma=sigma) | |
def wdba(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, verbose=0): | |
# https://ieeexplore.ieee.org/document/8215569 | |
# use verbose = -1 to turn off warnings | |
# slope_constraint is for DTW. "symmetric" or "asymmetric" | |
x = np.array(x) | |
import utils.dtw as dtw | |
if use_window: | |
window = np.ceil(x.shape[1] / 10.).astype(int) | |
else: | |
window = None | |
orig_steps = np.arange(x.shape[1]) | |
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
ret = np.zeros_like(x) | |
# for i in tqdm(range(ret.shape[0])): | |
for i in range(ret.shape[0]): | |
# get the same class as i | |
choices = np.where(l == l[i])[0] | |
if choices.size > 0: | |
# pick random intra-class pattern | |
k = min(choices.size, batch_size) | |
random_prototypes = x[np.random.choice(choices, k, replace=False)] | |
# calculate dtw between all | |
dtw_matrix = np.zeros((k, k)) | |
for p, prototype in enumerate(random_prototypes): | |
for s, sample in enumerate(random_prototypes): | |
if p == s: | |
dtw_matrix[p, s] = 0. | |
else: | |
dtw_matrix[p, s] = dtw.dtw(prototype, sample, dtw.RETURN_VALUE, | |
slope_constraint=slope_constraint, window=window) | |
# get medoid | |
medoid_id = np.argsort(np.sum(dtw_matrix, axis=1))[0] | |
nearest_order = np.argsort(dtw_matrix[medoid_id]) | |
medoid_pattern = random_prototypes[medoid_id] | |
# start weighted DBA | |
average_pattern = np.zeros_like(medoid_pattern) | |
weighted_sums = np.zeros((medoid_pattern.shape[0])) | |
for nid in nearest_order: | |
if nid == medoid_id or dtw_matrix[medoid_id, nearest_order[1]] == 0.: | |
average_pattern += medoid_pattern | |
weighted_sums += np.ones_like(weighted_sums) | |
else: | |
path = dtw.dtw(medoid_pattern, random_prototypes[nid], dtw.RETURN_PATH, | |
slope_constraint=slope_constraint, window=window) | |
dtw_value = dtw_matrix[medoid_id, nid] | |
warped = random_prototypes[nid, path[1]] | |
weight = np.exp(np.log(0.5) * dtw_value / dtw_matrix[medoid_id, nearest_order[1]]) | |
average_pattern[path[0]] += weight * warped | |
weighted_sums[path[0]] += weight | |
ret[i, :] = average_pattern / weighted_sums[:, np.newaxis] | |
else: | |
# if verbose > -1: | |
# print("There is only one pattern of class {}, skipping pattern average".format(l[i])) | |
ret[i, :] = x[i] | |
return ret | |
# Proposed | |
def random_guided_warp(x, labels, slope_constraint="symmetric", use_window=True, dtw_type="normal", verbose=0): | |
# use verbose = -1 to turn off warnings | |
# slope_constraint is for DTW. "symmetric" or "asymmetric" | |
# dtw_type is for shapeDTW or DTW. "normal" or "shape" | |
import utils.dtw as dtw | |
if use_window: | |
window = np.ceil(x.shape[1] / 10.).astype(int) | |
else: | |
window = None | |
orig_steps = np.arange(x.shape[1]) | |
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
ret = np.zeros_like(x) | |
# for i, pat in enumerate(tqdm(x)): | |
for i, pat in enumerate(x): | |
# guarentees that same one isnt selected | |
choices = np.delete(np.arange(x.shape[0]), i) | |
# remove ones of different classes | |
choices = np.where(l[choices] == l[i])[0] | |
if choices.size > 0: | |
# pick random intra-class pattern | |
random_prototype = x[np.random.choice(choices)] | |
if dtw_type == "shape": | |
path = dtw.shape_dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, | |
window=window) | |
else: | |
path = dtw.dtw(random_prototype, pat, dtw.RETURN_PATH, slope_constraint=slope_constraint, window=window) | |
# Time warp | |
warped = pat[path[1]] | |
for dim in range(x.shape[2]): | |
ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), | |
warped[:, dim]).T | |
else: | |
# if verbose > -1: | |
# print("There is only one pattern of class {}, skipping timewarping".format(l[i])) | |
ret[i, :] = pat | |
return ret | |
def random_guided_warp_shape(x, labels, slope_constraint="symmetric", use_window=True): | |
return random_guided_warp(x, labels, slope_constraint, use_window, dtw_type="shape") | |
def discriminative_guided_warp(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True, | |
dtw_type="normal", use_variable_slice=True, verbose=0): | |
# use verbose = -1 to turn off warnings | |
# slope_constraint is for DTW. "symmetric" or "asymmetric" | |
# dtw_type is for shapeDTW or DTW. "normal" or "shape" | |
import utils.dtw as dtw | |
if use_window: | |
window = np.ceil(x.shape[1] / 10.).astype(int) | |
else: | |
window = None | |
orig_steps = np.arange(x.shape[1]) | |
l = np.argmax(labels, axis=1) if labels.ndim > 1 else labels | |
positive_batch = np.ceil(batch_size / 2).astype(int) | |
negative_batch = np.floor(batch_size / 2).astype(int) | |
ret = np.zeros_like(x) | |
warp_amount = np.zeros(x.shape[0]) | |
# for i, pat in enumerate(tqdm(x)): | |
for i, pat in enumerate(x): | |
# guarentees that same one isnt selected | |
choices = np.delete(np.arange(x.shape[0]), i) | |
# remove ones of different classes | |
positive = np.where(l[choices] == l[i])[0] | |
negative = np.where(l[choices] != l[i])[0] | |
if positive.size > 0 and negative.size > 0: | |
pos_k = min(positive.size, positive_batch) | |
neg_k = min(negative.size, negative_batch) | |
positive_prototypes = x[np.random.choice(positive, pos_k, replace=False)] | |
negative_prototypes = x[np.random.choice(negative, neg_k, replace=False)] | |
# vector embedding and nearest prototype in one | |
pos_aves = np.zeros((pos_k)) | |
neg_aves = np.zeros((pos_k)) | |
if dtw_type == "shape": | |
for p, pos_prot in enumerate(positive_prototypes): | |
for ps, pos_samp in enumerate(positive_prototypes): | |
if p != ps: | |
pos_aves[p] += (1. / (pos_k - 1.)) * dtw.shape_dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, | |
slope_constraint=slope_constraint, | |
window=window) | |
for ns, neg_samp in enumerate(negative_prototypes): | |
neg_aves[p] += (1. / neg_k) * dtw.shape_dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, | |
slope_constraint=slope_constraint, window=window) | |
selected_id = np.argmax(neg_aves - pos_aves) | |
path = dtw.shape_dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, | |
slope_constraint=slope_constraint, window=window) | |
else: | |
for p, pos_prot in enumerate(positive_prototypes): | |
for ps, pos_samp in enumerate(positive_prototypes): | |
if p != ps: | |
pos_aves[p] += (1. / (pos_k - 1.)) * dtw.dtw(pos_prot, pos_samp, dtw.RETURN_VALUE, | |
slope_constraint=slope_constraint, | |
window=window) | |
for ns, neg_samp in enumerate(negative_prototypes): | |
neg_aves[p] += (1. / neg_k) * dtw.dtw(pos_prot, neg_samp, dtw.RETURN_VALUE, | |
slope_constraint=slope_constraint, window=window) | |
selected_id = np.argmax(neg_aves - pos_aves) | |
path = dtw.dtw(positive_prototypes[selected_id], pat, dtw.RETURN_PATH, | |
slope_constraint=slope_constraint, window=window) | |
# Time warp | |
warped = pat[path[1]] | |
warp_path_interp = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), path[1]) | |
warp_amount[i] = np.sum(np.abs(orig_steps - warp_path_interp)) | |
for dim in range(x.shape[2]): | |
ret[i, :, dim] = np.interp(orig_steps, np.linspace(0, x.shape[1] - 1., num=warped.shape[0]), | |
warped[:, dim]).T | |
else: | |
# if verbose > -1: | |
# print("There is only one pattern of class {}".format(l[i])) | |
ret[i, :] = pat | |
warp_amount[i] = 0. | |
if use_variable_slice: | |
max_warp = np.max(warp_amount) | |
if max_warp == 0: | |
# unchanged | |
ret = window_slice(ret, reduce_ratio=0.9) | |
else: | |
for i, pat in enumerate(ret): | |
# Variable Sllicing | |
ret[i] = window_slice(pat[np.newaxis, :, :], reduce_ratio=0.9 + 0.1 * warp_amount[i] / max_warp)[0] | |
return ret | |
def discriminative_guided_warp_shape(x, labels, batch_size=6, slope_constraint="symmetric", use_window=True): | |
return discriminative_guided_warp(x, labels, batch_size, slope_constraint, use_window, dtw_type="shape") | |
def run_augmentation(x, y, args): | |
print("Augmenting %s" % args.data) | |
np.random.seed(args.seed) | |
x_aug = x | |
y_aug = y | |
if args.augmentation_ratio > 0: | |
augmentation_tags = "%d" % args.augmentation_ratio | |
for n in range(args.augmentation_ratio): | |
x_temp, augmentation_tags = augment(x, y, args) | |
x_aug = np.append(x_aug, x_temp, axis=0) | |
y_aug = np.append(y_aug, y, axis=0) | |
print("Round %d: %s done" % (n, augmentation_tags)) | |
if args.extra_tag: | |
augmentation_tags += "_" + args.extra_tag | |
else: | |
augmentation_tags = args.extra_tag | |
return x_aug, y_aug, augmentation_tags | |
def run_augmentation_single(x, y, args): | |
# print("Augmenting %s"%args.data) | |
np.random.seed(args.seed) | |
x_aug = x | |
y_aug = y | |
if len(x.shape) < 3: | |
# Augmenting on the entire series: using the input data as "One Big Batch" | |
# Before - (sequence_length, num_channels) | |
# After - (1, sequence_length, num_channels) | |
# Note: the 'sequence_length' here is actually the length of the entire series | |
x_input = x[np.newaxis, :] | |
elif len(x.shape) == 3: | |
# Augmenting on the batch series: keep current dimension (batch_size, sequence_length, num_channels) | |
x_input = x | |
else: | |
raise ValueError("Input must be (batch_size, sequence_length, num_channels) dimensional") | |
if args.augmentation_ratio > 0: | |
augmentation_tags = "%d" % args.augmentation_ratio | |
for n in range(args.augmentation_ratio): | |
x_aug, augmentation_tags = augment(x_input, y, args) | |
# print("Round %d: %s done"%(n, augmentation_tags)) | |
if args.extra_tag: | |
augmentation_tags += "_" + args.extra_tag | |
else: | |
augmentation_tags = args.extra_tag | |
if (len(x.shape) < 3): | |
# Reverse to two-dimensional in whole series augmentation scenario | |
x_aug = x_aug.squeeze(0) | |
return x_aug, y_aug, augmentation_tags | |
def augment(x, y, args): | |
import utils.augmentation as aug | |
augmentation_tags = "" | |
if args.jitter: | |
x = aug.jitter(x) | |
augmentation_tags += "_jitter" | |
if args.scaling: | |
x = aug.scaling(x) | |
augmentation_tags += "_scaling" | |
if args.rotation: | |
x = aug.rotation(x) | |
augmentation_tags += "_rotation" | |
if args.permutation: | |
x = aug.permutation(x) | |
augmentation_tags += "_permutation" | |
if args.randompermutation: | |
x = aug.permutation(x, seg_mode="random") | |
augmentation_tags += "_randomperm" | |
if args.magwarp: | |
x = aug.magnitude_warp(x) | |
augmentation_tags += "_magwarp" | |
if args.timewarp: | |
x = aug.time_warp(x) | |
augmentation_tags += "_timewarp" | |
if args.windowslice: | |
x = aug.window_slice(x) | |
augmentation_tags += "_windowslice" | |
if args.windowwarp: | |
x = aug.window_warp(x) | |
augmentation_tags += "_windowwarp" | |
if args.spawner: | |
x = aug.spawner(x, y) | |
augmentation_tags += "_spawner" | |
if args.dtwwarp: | |
x = aug.random_guided_warp(x, y) | |
augmentation_tags += "_rgw" | |
if args.shapedtwwarp: | |
x = aug.random_guided_warp_shape(x, y) | |
augmentation_tags += "_rgws" | |
if args.wdba: | |
x = aug.wdba(x, y) | |
augmentation_tags += "_wdba" | |
if args.discdtw: | |
x = aug.discriminative_guided_warp(x, y) | |
augmentation_tags += "_dgw" | |
if args.discsdtw: | |
x = aug.discriminative_guided_warp_shape(x, y) | |
augmentation_tags += "_dgws" | |
return x, augmentation_tags |