Spaces:
Running
Running
File size: 10,760 Bytes
67069a4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
import math
import numpy as np
import typing as T
import seaborn as sns
import matplotlib.pyplot as plt
import sklearn
import sklearn.manifold
import tensorflow as tf
import numpy.typing as npt
from tensorflow import keras
from tensorflow.python.types.core import TensorLike
Tensor = T.Union[tf.Tensor, npt.NDArray]
OptTensor = T.Optional[Tensor]
EPS = 1e-18
class TSFeatureScaler:
"""Global time series scaler that scales all features to [0,1] then normalizes to [-1,1]"""
def __init__(self) -> None:
self.min_val = None
self.max_val = None
def fit(self, X: TensorLike) -> "TSFeatureScaler":
"""
Fit scaler to data
Args:
X: Input tensor of shape [N, T, D]
(N: samples, T: timesteps, D: features)
"""
# 计算整个数据集的全局最大最小值
self.min_val = np.min(X)
self.max_val = np.max(X)
return self
def transform(self, X: TensorLike) -> TensorLike:
"""
Transform data in two steps:
1. Scale to [0,1] using min-max scaling
2. Normalize to [-1,1]
"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted before transform")
# 1. 缩放到[0,1]
X_scaled = (X - self.min_val) / (self.max_val - self.min_val + EPS)
# 2. 归一化到[-1,1]
X_normalized = 2.0 * X_scaled - 1.0
return X_normalized
def inverse_transform(self, X: TensorLike) -> TensorLike:
"""
Inverse transform data:
1. From [-1,1] back to [0,1]
2. From [0,1] back to original range
"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted before inverse_transform")
# 1. 从[-1,1]转回[0,1]
X_scaled = (X + 1.0) / 2.0
# 2. 从[0,1]转回原始范围
X_original = X_scaled * (self.max_val - self.min_val + EPS) + self.min_val
return X_original
def fit_transform(self, X: TensorLike) -> TensorLike:
"""Fit to data, then transform it"""
return self.fit(X).transform(X)
def get_range(self) -> T.Tuple[float, float]:
"""获取原始数据的范围"""
if self.min_val is None or self.max_val is None:
raise ValueError("Scaler must be fitted first")
return (self.min_val, self.max_val)
EPS = 1e-18
class TSFeatureWiseScaler():
def __init__(self, feature_range: T.Tuple[float, float] = (0, 1)) -> None:
assert len(feature_range) == 2
self._min_v, self._max_v = feature_range
# X: N x T x D
def fit(self, X: TensorLike) -> "TSFeatureWiseScaler":
D = X.shape[2]
self.mins = np.zeros(D)
self.maxs = np.zeros(D)
for i in range(D):
self.mins[i] = np.min(X[:, :, i])
self.maxs[i] = np.max(X[:, :, i])
return self
def transform(self, X: TensorLike) -> TensorLike:
return ((X - self.mins) / (self.maxs - self.mins + EPS)) * (self._max_v - self._min_v) + self._min_v
def inverse_transform(self, X: TensorLike) -> TensorLike:
X -= self._min_v
X /= self._max_v - self._min_v
X *= (self.maxs - self.mins + EPS)
X += self.mins
return X
def fit_transform(self, X: TensorLike) -> TensorLike:
self.fit(X)
return self.transform(X)
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=0.99): # beta_end=0.99
betas = np.linspace(beta_start, beta_end, timesteps, dtype=np.float32)
return betas
def cosine_beta_schedule(timesteps, s=0.008):
steps = timesteps + 1
x = np.linspace(0, timesteps, steps, dtype=np.float64)
alphas_cumprod = np.cos(((x / timesteps) + s) / (1 + s) * math.pi * 0.5) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
betas = np.clip(betas, 0, 0.999)
return betas
def reconstruction_loss_by_axis(original: tf.Tensor, reconstructed: tf.Tensor, axis: int = 0) -> tf.Tensor:
"""
Calculate the reconstruction loss based on a specified axis.
This function computes the reconstruction loss between the original data and
the reconstructed data along a specified axis. The loss can be computed in
two ways depending on the chosen axis:
- When `axis` is 0, it computes the loss as the sum of squared differences
between the original and reconstructed data for all elements.
- When `axis` is 1 or 2, it computes the mean squared error (MSE) between the
mean values along the chosen axis for the original and reconstructed data.
Parameters:
----------
original : tf.Tensor
The original data tensor.
reconstructed : tf.Tensor
The reconstructed data tensor, typically produced by an autoencoder.
axis : int, optional (default=0)
The axis along which to compute the reconstruction loss:
- 0: All elements (sum of squared differences).
- 1: Along features (MSE).
- 2: Along time steps (MSE).
Returns:
-------
tf.Tensor
The computed reconstruction loss as a TensorFlow tensor.
Notes:
------
- This function is commonly used in the context of autoencoders and other
reconstruction-based models to assess the quality of the reconstruction.
- The choice of `axis` determines how the loss is calculated, and it should
align with the data's structure.
"""
# axis=0 all (sum of squared diffs)
# axis=1 features (MSE)
# axis=2 times (MSE)
if axis == 0:
return tf.reduce_sum(tf.math.squared_difference(original, reconstructed))
else:
return tf.losses.mean_squared_error(tf.reduce_mean(original, axis=axis), tf.reduce_mean(reconstructed, axis=axis))
def gen_sine_dataset(N: int, T: int, D: int, max_value: int = 10) -> npt.NDArray:
result = []
for i in range(N):
result.append([])
a = np.random.random() * max_value
shift = np.random.random() * max_value + 1
ts = np.arange(0, T, 1)
for d in range(1, D + 1):
result[-1].append((a * np.sin((d + 3) * ts / 25. + shift)).T)
return np.transpose(np.array(result), [0, 2, 1])
def gen_sine_vs_const_dataset(N: int, T: int, D: int, max_value: int = 10, const: int = 0) -> T.Tuple[TensorLike, TensorLike]:
result_X, result_y = [], []
for i in range(N):
scales = np.random.random(D) * max_value
consts = np.random.random(D) * const
shifts = np.random.random(D) * 2
alpha = np.random.random()
if np.random.random() < 0.5:
times = np.repeat(np.arange(0, T, 1)[:, None], D, axis=1) / 10
result_X.append(np.sin(alpha * times + shifts) * scales)
result_y.append(0)
else:
result_X.append(np.tile(consts, (T, 1)))
result_y.append(1)
return np.array(result_X), np.array(result_y)
def visualize_ts_lineplot(
ts: Tensor,
ys: OptTensor = None,
num: int = 5,
unite_features: bool = True,
) -> None:
assert len(ts.shape) == 3
fig, axs = plt.subplots(num, 1, figsize=(14, 10))
if num == 1:
axs = [axs]
ids = np.random.choice(ts.shape[0], size=num, replace=False)
for i, sample_id in enumerate(ids):
if not unite_features:
feature_id = np.random.randint(ts.shape[2])
sns.lineplot(
x=range(ts.shape[1]),
y=ts[sample_id, :, feature_id],
ax=axs[i],
label=rf"feature \#{feature_id}",
)
else:
for feat_id in range(ts.shape[2]):
sns.lineplot(
x=range(ts.shape[1]), y=ts[sample_id, :, feat_id], ax=axs[i]
)
if ys is not None:
if len(ys.shape) == 1:
axs[i].set_title(ys[sample_id])
elif len(ys.shape) == 2:
sns.lineplot(
x=range(ts.shape[1]),
y=ys[sample_id],
ax=axs[i].twinx(),
color="g",
label="Target variable",
)
else:
raise ValueError("ys contains too many dimensions")
#plt.show()
def visualize_tsne(
X: Tensor,
y: Tensor,
X_gen: Tensor,
y_gen: Tensor,
path: str = "/tmp/tsne_embeddings.pdf",
feature_averaging: bool = False,
perplexity=30.0
) -> None:
"""
Visualizes t-SNE embeddings of real and synthetic data.
This function generates a scatter plot of t-SNE embeddings for real and synthetic data.
Each data point is represented by a marker on the plot, and the colors of the markers
correspond to the corresponding class labels of the data points.
:param X: The original real data tensor of shape (num_samples, num_features).
:type X: tsgm.types.Tensor
:param y: The labels of the original real data tensor of shape (num_samples,).
:type y: tsgm.types.Tensor
:param X_gen: The generated synthetic data tensor of shape (num_samples, num_features).
:type X_gen: tsgm.types.Tensor
:param y_gen: The labels of the generated synthetic data tensor of shape (num_samples,).
:type y_gen: tsgm.types.Tensor
:param path: The path to save the visualization as a PDF file. Defaults to "/tmp/tsne_embeddings.pdf".
:type path: str, optional
:param feature_averaging: Whether to compute the average features for each class. Defaults to False.
:type feature_averaging: bool, optional
"""
tsne = sklearn.manifold.TSNE(n_components=2, perplexity=perplexity, learning_rate="auto", init="random")
if feature_averaging:
X_all = np.concatenate((np.mean(X, axis=2), np.mean(X_gen, axis=2)))
X_emb = tsne.fit_transform(np.resize(X_all, (X_all.shape[0], X_all.shape[1])))
else:
X_all = np.concatenate((X, X_gen))
X_emb = tsne.fit_transform(
np.resize(X_all, (X_all.shape[0], X_all.shape[1] * X_all.shape[2]))
)
y_all = np.concatenate((y, y_gen))
c = np.argmax(y_all, axis=1)
colors = {0: "class 0", 1: "class 1"}
c = [colors[el] for el in c]
point_styles = ["hist"] * X.shape[0] + ["gen"] * X_gen.shape[0]
plt.figure(figsize=(8, 6), dpi=80)
sns.scatterplot(
x=X_emb[:, 0],
y=X_emb[:, 1],
hue=c,
style=point_styles,
markers={"hist": "<", "gen": "H"},
alpha=0.7,
)
plt.legend()
plt.box(False)
plt.axis("off")
plt.savefig(path)
plt.show()
|