|
from abc import ABC, abstractmethod |
|
import os |
|
import json |
|
|
|
import numpy as np |
|
from scipy import stats |
|
from sklearn.neighbors import NearestNeighbors |
|
from scipy.spatial.distance import cosine |
|
|
|
from singleVis.eval.evaluate import * |
|
from singleVis.backend import * |
|
from singleVis.utils import is_B, js_div |
|
from singleVis.visualizer import visualizer |
|
|
|
class EvaluatorAbstractClass(ABC): |
|
def __init__(self, data_provider, projector, *args, **kwargs): |
|
self.data_provider = data_provider |
|
self.projector = projector |
|
|
|
@abstractmethod |
|
def eval_nn_train(self, epoch, n_neighbors): |
|
pass |
|
|
|
@abstractmethod |
|
def eval_nn_test(self, epoch, n_neighbors): |
|
pass |
|
|
|
@abstractmethod |
|
def eval_inv_train(self, epoch): |
|
pass |
|
|
|
@abstractmethod |
|
def eval_inv_test(self, epoch): |
|
pass |
|
|
|
@abstractmethod |
|
def save_epoch_eval(self, n_epoch, file_name="evaluation"): |
|
pass |
|
|
|
@abstractmethod |
|
def get_eval(self, file_name="evaluation"): |
|
pass |
|
|
|
|
|
class Evaluator(EvaluatorAbstractClass): |
|
def __init__(self, data_provider, projector, verbose=1): |
|
self.data_provider = data_provider |
|
self.projector = projector |
|
self.verbose = verbose |
|
|
|
|
|
|
|
def eval_nn_train(self, epoch, n_neighbors): |
|
train_data = self.data_provider.train_representation(epoch) |
|
train_data = train_data.reshape(len(train_data), -1) |
|
embedding = self.projector.batch_project(epoch, train_data) |
|
val = evaluate_proj_nn_perseverance_knn(train_data, embedding, n_neighbors=n_neighbors, metric="euclidean") |
|
if self.verbose: |
|
print("#train# nn preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch)) |
|
return val |
|
|
|
def eval_nn_test(self, epoch, n_neighbors): |
|
train_data = self.data_provider.train_representation(epoch) |
|
train_data = train_data.reshape(len(train_data), -1) |
|
test_data = self.data_provider.test_representation(epoch) |
|
test_data = test_data.reshape(len(test_data), -1) |
|
fitting_data = np.concatenate((train_data, test_data), axis=0) |
|
embedding = self.projector.batch_project(epoch, fitting_data) |
|
val = evaluate_proj_nn_perseverance_knn(fitting_data, embedding, n_neighbors=n_neighbors, metric="euclidean") |
|
if self.verbose: |
|
print("#test# nn preserving : {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch)) |
|
return val |
|
|
|
def eval_b_train(self, epoch, n_neighbors): |
|
train_data = self.data_provider.train_representation(epoch) |
|
train_data = train_data.reshape(len(train_data), -1) |
|
border_centers = self.data_provider.border_representation(epoch) |
|
border_centers = border_centers.reshape(len(border_centers), -1) |
|
low_center = self.projector.batch_project(epoch, border_centers) |
|
low_train = self.projector.batch_project(epoch, train_data) |
|
|
|
val = evaluate_proj_boundary_perseverance_knn(train_data, |
|
low_train, |
|
border_centers, |
|
low_center, |
|
n_neighbors=n_neighbors) |
|
if self.verbose: |
|
print("#train# boundary preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch)) |
|
return val |
|
|
|
def eval_b_test(self, epoch, n_neighbors): |
|
test_data = self.data_provider.test_representation(epoch) |
|
test_data = test_data.reshape(len(test_data), -1) |
|
border_centers = self.data_provider.test_border_representation(epoch) |
|
border_centers = border_centers.reshape(len(border_centers), -1) |
|
|
|
low_center = self.projector.batch_project(epoch, border_centers) |
|
low_test = self.projector.batch_project(epoch, test_data) |
|
|
|
val = evaluate_proj_boundary_perseverance_knn(test_data, |
|
low_test, |
|
border_centers, |
|
low_center, |
|
n_neighbors=n_neighbors) |
|
if self.verbose: |
|
print("#test# boundary preserving: {:.2f}/{:d} in epoch {:d}".format(val, n_neighbors, epoch)) |
|
return val |
|
|
|
def eval_inv_train(self, epoch): |
|
train_data = self.data_provider.train_representation(epoch) |
|
embedding = self.projector.batch_project(epoch, train_data) |
|
inv_data = self.projector.batch_inverse(epoch, embedding) |
|
|
|
pred = self.data_provider.get_pred(epoch, train_data).argmax(axis=1) |
|
new_pred = self.data_provider.get_pred(epoch, inv_data).argmax(axis=1) |
|
|
|
val = evaluate_inv_accu(pred, new_pred) |
|
if self.verbose: |
|
print("#train# PPR: {:.2f} in epoch {:d}".format(val, epoch)) |
|
return val |
|
|
|
def eval_inv_test(self, epoch): |
|
test_data = self.data_provider.test_representation(epoch) |
|
embedding = self.projector.batch_project(epoch, test_data) |
|
inv_data = self.projector.batch_inverse(epoch, embedding) |
|
|
|
pred = self.data_provider.get_pred(epoch, test_data).argmax(axis=1) |
|
new_pred = self.data_provider.get_pred(epoch, inv_data).argmax(axis=1) |
|
|
|
val = evaluate_inv_accu(pred, new_pred) |
|
if self.verbose: |
|
print("#test# PPR: {:.2f} in epoch {:d}".format(val, epoch)) |
|
return val |
|
|
|
def eval_inv_dist_train(self, epoch): |
|
train_data = self.data_provider.train_representation(epoch) |
|
embedding = self.projector.batch_project(epoch, train_data) |
|
inv_data = self.projector.batch_inverse(epoch, embedding) |
|
dist = np.linalg.norm(train_data-inv_data, axis=1).mean() |
|
|
|
if self.verbose: |
|
print("#train# inverse projection distance: {:.2f} in epoch {:d}".format(dist, epoch)) |
|
return float(dist) |
|
|
|
def eval_inv_dist_test(self, epoch): |
|
test_data = self.data_provider.test_representation(epoch) |
|
embedding = self.projector.batch_project(epoch, test_data) |
|
inv_data = self.projector.batch_inverse(epoch, embedding) |
|
dist = np.linalg.norm(test_data-inv_data, axis=1).mean() |
|
if self.verbose: |
|
print("#test# inverse projection distance: {:.2f} in epoch {:d}".format(dist, epoch)) |
|
return float(dist) |
|
|
|
def eval_temporal_train(self, n_neighbors): |
|
eval_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p |
|
l = self.data_provider.train_num |
|
|
|
alpha = np.zeros((eval_num, l)) |
|
delta_x = np.zeros((eval_num, l)) |
|
|
|
for t in range(eval_num): |
|
prev_data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s) |
|
prev_embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, prev_data) |
|
|
|
curr_data = self.data_provider.train_representation((t+1) * self.data_provider.p + self.data_provider.s) |
|
curr_embedding = self.projector.batch_project((t+1) * self.data_provider.p + self.data_provider.s, curr_data) |
|
|
|
alpha_ = find_neighbor_preserving_rate(prev_data, curr_data, n_neighbors=n_neighbors) |
|
delta_x_ = np.linalg.norm(prev_embedding - curr_embedding, axis=1) |
|
|
|
alpha[t] = alpha_ |
|
delta_x[t] = delta_x_ |
|
|
|
val_corr, corr_std = evaluate_proj_temporal_perseverance_corr(alpha, delta_x) |
|
if self.verbose: |
|
print("Temporal preserving (train): {:.3f}\t std :{:.3f}".format(val_corr, corr_std)) |
|
return val_corr, corr_std |
|
|
|
def eval_temporal_test(self, n_neighbors): |
|
eval_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p |
|
l = self.data_provider.train_num + self.data_provider.test_num |
|
|
|
alpha = np.zeros((eval_num, l)) |
|
delta_x = np.zeros((eval_num, l)) |
|
for t in range(eval_num): |
|
prev_data_test = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s) |
|
prev_data_train = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s) |
|
prev_data = np.concatenate((prev_data_train, prev_data_test), axis=0) |
|
prev_embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, prev_data) |
|
|
|
curr_data_test = self.data_provider.test_representation((t+1) * self.data_provider.p + self.data_provider.s) |
|
curr_data_train = self.data_provider.train_representation((t+1) * self.data_provider.p + self.data_provider.s) |
|
curr_data = np.concatenate((curr_data_train, curr_data_test), axis=0) |
|
curr_embedding = self.projector.batch_project((t+1) * self.data_provider.p + self.data_provider.s, curr_data) |
|
|
|
alpha_ = find_neighbor_preserving_rate(prev_data, curr_data, n_neighbors=n_neighbors) |
|
delta_x_ = np.linalg.norm(prev_embedding - curr_embedding, axis=1) |
|
|
|
alpha[t] = alpha_ |
|
delta_x[t] = delta_x_ |
|
|
|
val_corr, corr_std = evaluate_proj_temporal_perseverance_corr(alpha, delta_x) |
|
if self.verbose: |
|
print("Temporal preserving (test): {:.3f}\t std:{:.3f}".format(val_corr, corr_std)) |
|
return val_corr, corr_std |
|
|
|
def eval_temporal_nn_train(self, epoch, n_neighbors): |
|
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1 |
|
l = self.data_provider.train_num |
|
high_dists = np.zeros((l, epoch_num)) |
|
low_dists = np.zeros((l, epoch_num)) |
|
|
|
curr_data = self.data_provider.train_representation(epoch) |
|
curr_embedding = self.projector.batch_project(epoch, curr_data) |
|
|
|
for t in range(epoch_num): |
|
data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s) |
|
embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data) |
|
|
|
high_dist = np.linalg.norm(curr_data - data, axis=1) |
|
low_dist = np.linalg.norm(curr_embedding - embedding, axis=1) |
|
high_dists[:, t] = high_dist |
|
low_dists[:, t] = low_dist |
|
|
|
|
|
|
|
high_orders = np.argsort(high_dists, axis=1) |
|
low_orders = np.argsort(low_dists, axis=1) |
|
|
|
high_rankings = high_orders[:, 1:n_neighbors+1] |
|
low_rankings = low_orders[:, 1:n_neighbors+1] |
|
|
|
corr = np.zeros(len(high_dists)) |
|
for i in range(len(data)): |
|
corr[i] = len(np.intersect1d(high_rankings[i], low_rankings[i])) |
|
|
|
if self.verbose: |
|
print("Temporal temporal neighbor preserving (train) for {}-th epoch {}: {:.3f}\t std :{:.3f}".format(epoch, n_neighbors, corr.mean(), corr.std())) |
|
return float(corr.mean()) |
|
|
|
def eval_temporal_nn_test(self, epoch, n_neighbors): |
|
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1 |
|
l = self.data_provider.test_num |
|
high_dists = np.zeros((l, epoch_num)) |
|
low_dists = np.zeros((l, epoch_num)) |
|
|
|
curr_data = self.data_provider.test_representation(epoch) |
|
curr_embedding = self.projector.batch_project(epoch, curr_data) |
|
|
|
for t in range(epoch_num): |
|
data = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s) |
|
embedding = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data) |
|
|
|
high_dist = np.linalg.norm(curr_data - data, axis=1) |
|
low_dist = np.linalg.norm(curr_embedding - embedding, axis=1) |
|
high_dists[:, t] = high_dist |
|
low_dists[:,t] = low_dist |
|
|
|
|
|
high_orders = np.argsort(high_dists, axis=1) |
|
low_orders = np.argsort(low_dists, axis=1) |
|
|
|
high_rankings = high_orders[:, 1:n_neighbors+1] |
|
low_rankings = low_orders[:, 1:n_neighbors+1] |
|
corr = np.zeros(len(high_dists)) |
|
for i in range(len(data)): |
|
corr[i] = len(np.intersect1d(high_rankings[i], low_rankings[i])) |
|
|
|
if self.verbose: |
|
print("Temporal nn preserving (test) for {}-th epoch {}: {:.3f}\t std:{:.3f}".format(epoch, n_neighbors, corr.mean(), corr.std())) |
|
return float(corr.mean()) |
|
|
|
def eval_spatial_temporal_nn_train(self, n_neighbors, feature_dim): |
|
""" |
|
evaluate whether vis model can preserve the ranking of close spatial and temporal neighbors |
|
""" |
|
|
|
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1 |
|
train_num = self.data_provider.train_num |
|
|
|
high_features = np.zeros((epoch_num*train_num, feature_dim)) |
|
low_features = np.zeros((epoch_num*train_num, 2)) |
|
|
|
for t in range(epoch_num): |
|
data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s) |
|
high_features[t*train_num:(t+1)*train_num] = np.copy(data) |
|
low_features[t*train_num:(t+1)*train_num] = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data) |
|
|
|
val = evaluate_proj_nn_perseverance_knn(high_features, low_features, n_neighbors) |
|
|
|
if self.verbose: |
|
print("Spatial/Temporal nn preserving (train):\t{:.3f}/{:d}".format(val, n_neighbors)) |
|
return val |
|
|
|
|
|
def eval_spatial_temporal_nn_test(self, n_neighbors, feature_dim): |
|
|
|
epoch_num = (self.data_provider.e - self.data_provider.s) // self.data_provider.p + 1 |
|
train_num = self.data_provider.train_num |
|
test_num = self.data_provider.test_num |
|
num = train_num + test_num |
|
|
|
high_features = np.zeros((epoch_num*num, feature_dim)) |
|
low_features = np.zeros((epoch_num*num, 2)) |
|
|
|
for t in range(epoch_num): |
|
train_data = self.data_provider.train_representation(t * self.data_provider.p + self.data_provider.s) |
|
test_data = self.data_provider.test_representation(t * self.data_provider.p + self.data_provider.s) |
|
data = np.concatenate((train_data, test_data), axis=0) |
|
low_features[t*num:(t+1)*num] = self.projector.batch_project(t * self.data_provider.p + self.data_provider.s, data) |
|
high_features[t*num:(t+1)*num] = np.copy(data) |
|
|
|
val =evaluate_proj_nn_perseverance_knn(high_features, low_features, n_neighbors) |
|
|
|
if self.verbose: |
|
print("Spatial/Temporal nn preserving (test):\t{:.3f}/{:d}".format(val, n_neighbors)) |
|
return val |
|
|
|
|
|
def eval_temporal_global_corr_train(self, epoch, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
|
|
LEN = self.data_provider.train_num |
|
EPOCH = (end - start) // period + 1 |
|
repr_dim = self.data_provider.representation_dim |
|
all_train_repr = np.zeros((EPOCH,LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,LEN,2)) |
|
|
|
|
|
for i in range(start,end + 1, period): |
|
index = (i - start) // period |
|
all_train_repr[index] = self.data_provider.train_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_train_repr[index]) |
|
|
|
corrs = np.zeros(LEN) |
|
ps = np.zeros(LEN) |
|
for i in range(LEN): |
|
high_embeddings = all_train_repr[:,i,:].squeeze() |
|
low_embeddings = low_repr[:,i,:].squeeze() |
|
|
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - start) // period], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - start) // period], axis=1) |
|
|
|
corr, p = stats.spearmanr(high_dists, low_dists) |
|
corrs[i] = corr |
|
ps[i] = p |
|
return corrs.mean() |
|
|
|
def eval_temporal_global_corr_test(self, epoch, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
TEST_LEN = self.data_provider.test_num |
|
EPOCH = (end - start) // period + 1 |
|
repr_dim = self.data_provider.representation_dim |
|
|
|
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,TEST_LEN,2)) |
|
for i in range(start,end + 1, period): |
|
index = (i - start) // period |
|
all_test_repr[index] = self.data_provider.test_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_test_repr[index]) |
|
|
|
corrs = np.zeros(TEST_LEN) |
|
ps = np.zeros(TEST_LEN) |
|
e = (epoch - start) // period |
|
for i in range(TEST_LEN): |
|
high_embeddings = all_test_repr[:,i,:].squeeze() |
|
low_embeddings = low_repr[:,i,:].squeeze() |
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1) |
|
corr, p = stats.spearmanr(high_dists, low_dists) |
|
corrs[i] = corr |
|
ps[i] = p |
|
return corrs.mean() |
|
|
|
def eval_temporal_weighted_global_corr_train(self, epoch, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
|
|
LEN = self.data_provider.train_num |
|
EPOCH = (end - start) // period + 1 |
|
repr_dim = self.data_provider.representation_dim |
|
all_train_repr = np.zeros((EPOCH,LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,LEN,2)) |
|
|
|
|
|
for i in range(start,end + 1, period): |
|
index = (i - start) // period |
|
all_train_repr[index] = self.data_provider.train_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_train_repr[index]) |
|
|
|
corrs = np.zeros(LEN) |
|
for i in range(LEN): |
|
high_embeddings = all_train_repr[:,i,:].squeeze() |
|
low_embeddings = low_repr[:,i,:].squeeze() |
|
|
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - start) // period], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - start) // period], axis=1) |
|
|
|
high_ranking = np.argsort(high_dists) |
|
low_ranking = np.argsort(low_dists) |
|
|
|
corr = evaluate_proj_temporal_weighted_global_corr(high_ranking, low_ranking) |
|
corrs[i] = corr |
|
return corrs.mean() |
|
|
|
def eval_temporal_weighted_global_corr_test(self, epoch, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
TEST_LEN = self.data_provider.test_num |
|
EPOCH = (end - start) // period + 1 |
|
repr_dim = self.data_provider.representation_dim |
|
|
|
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,TEST_LEN,2)) |
|
for i in range(start,end + 1, period): |
|
index = (i - start) // period |
|
all_test_repr[index] = self.data_provider.test_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_test_repr[index]) |
|
|
|
corrs = np.zeros(TEST_LEN) |
|
e = (epoch - start) // period |
|
for i in range(TEST_LEN): |
|
high_embeddings = all_test_repr[:,i,:].squeeze() |
|
low_embeddings = low_repr[:,i,:].squeeze() |
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1) |
|
high_ranking = np.argsort(high_dists) |
|
low_ranking = np.argsort(low_dists) |
|
corr = evaluate_proj_temporal_weighted_global_corr(high_ranking, low_ranking) |
|
corrs[i] = corr |
|
return corrs.mean() |
|
|
|
|
|
def eval_temporal_local_corr_train(self, epoch, stage, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
timeline = np.arange(start, end+period, period) |
|
|
|
stage_idxs = np.array_split(timeline, stage) |
|
selected_stage = stage_idxs[np.where([epoch in i for i in stage_idxs])[0][0]] |
|
|
|
|
|
LEN = self.data_provider.train_num |
|
EPOCH = len(selected_stage) |
|
repr_dim = self.data_provider.representation_dim |
|
all_train_repr = np.zeros((EPOCH,LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,LEN,2)) |
|
|
|
s = selected_stage[0] |
|
|
|
|
|
for i in selected_stage: |
|
index = (i - s) // period |
|
all_train_repr[index] = self.data_provider.train_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_train_repr[index]) |
|
|
|
corrs = np.zeros(LEN) |
|
for i in range(LEN): |
|
high_embeddings = all_train_repr[:,i,:] |
|
low_embeddings = low_repr[:,i,:] |
|
|
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[(epoch - s) // period], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[(epoch - s) // period], axis=1) |
|
corr, _ = stats.spearmanr(high_dists, low_dists) |
|
corrs[i] = corr |
|
return corrs.mean() |
|
|
|
def eval_temporal_local_corr_test(self, epoch, stage, start=None, end=None, period=None): |
|
|
|
if start is None: |
|
start = self.data_provider.s |
|
end = self.data_provider.e |
|
period = self.data_provider.p |
|
|
|
timeline = np.arange(start, end+period, period) |
|
|
|
stage_idxs = np.array_split(timeline, stage) |
|
selected_stage = stage_idxs[np.where([epoch in i for i in stage_idxs])[0][0]] |
|
s=selected_stage[0] |
|
|
|
TEST_LEN = self.data_provider.test_num |
|
EPOCH = len(selected_stage) |
|
repr_dim = self.data_provider.representation_dim |
|
|
|
all_test_repr = np.zeros((EPOCH,TEST_LEN,repr_dim)) |
|
low_repr = np.zeros((EPOCH,TEST_LEN,2)) |
|
for i in selected_stage: |
|
index = (i-s)//period |
|
all_test_repr[index] = self.data_provider.test_representation(i) |
|
low_repr[index] = self.projector.batch_project(i, all_test_repr[index]) |
|
|
|
corrs = np.zeros(TEST_LEN) |
|
e = (epoch - s) // period |
|
for i in range(TEST_LEN): |
|
high_embeddings = all_test_repr[:,i,:] |
|
low_embeddings = low_repr[:,i,:] |
|
high_dists = np.linalg.norm(high_embeddings - high_embeddings[e], axis=1) |
|
low_dists = np.linalg.norm(low_embeddings - low_embeddings[e], axis=1) |
|
corr, _ = stats.spearmanr(high_dists, low_dists) |
|
corrs[i] = corr |
|
return corrs.mean() |
|
|
|
def eval_moving_invariants_train(self, e_s, e_t, resolution=500): |
|
|
|
train_data_s = self.data_provider.train_representation(e_s) |
|
train_data_t = self.data_provider.train_representation(e_t) |
|
|
|
pred_s = self.data_provider.get_pred(e_s, train_data_s) |
|
pred_t = self.data_provider.get_pred(e_t, train_data_t) |
|
|
|
low_s = self.projector.batch_project(e_s, train_data_s) |
|
low_t = self.projector.batch_project(e_t, train_data_t) |
|
|
|
s_B = is_B(pred_s) |
|
t_B = is_B(pred_t) |
|
|
|
predictions_s = pred_s.argmax(1) |
|
predictions_t = pred_t.argmax(1) |
|
|
|
|
|
confident_sample = np.logical_and(np.logical_not(s_B),np.logical_not(t_B)) |
|
diff_pred = predictions_s!=predictions_t |
|
|
|
|
|
selected = np.logical_and(diff_pred, confident_sample) |
|
|
|
|
|
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10') |
|
grid_view_s, _ = vis.get_epoch_decision_view(e_s, resolution) |
|
grid_view_t, _ = vis.get_epoch_decision_view(e_t, resolution) |
|
|
|
grid_view_s = grid_view_s.reshape(resolution*resolution, -1) |
|
grid_view_t = grid_view_t.reshape(resolution*resolution, -1) |
|
|
|
grid_samples_s = self.projector.batch_inverse(e_s, grid_view_s) |
|
grid_samples_t = self.projector.batch_inverse(e_t, grid_view_t) |
|
|
|
grid_pred_s = self.data_provider.get_pred(e_s, grid_samples_s)+1e-8 |
|
grid_pred_t = self.data_provider.get_pred(e_t, grid_samples_t)+1e-8 |
|
|
|
grid_s_B = is_B(grid_pred_s) |
|
grid_t_B = is_B(grid_pred_t) |
|
|
|
grid_predictions_s = grid_pred_s.argmax(1) |
|
grid_predictions_t = grid_pred_t.argmax(1) |
|
|
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_s) |
|
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True) |
|
|
|
close_s_pred = grid_predictions_s[knn_indices].squeeze() |
|
close_s_B = grid_s_B[knn_indices].squeeze() |
|
s_true = np.logical_and(close_s_pred==predictions_s, close_s_B == s_B) |
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_t) |
|
_, knn_indices = high_neigh.kneighbors(low_t, n_neighbors=1, return_distance=True) |
|
|
|
close_t_pred = grid_predictions_t[knn_indices].squeeze() |
|
close_t_B = grid_t_B[knn_indices].squeeze() |
|
t_true = np.logical_and(close_t_pred==predictions_t, close_t_B == t_B) |
|
|
|
moving_sample_num = np.sum(selected) |
|
true_num = np.sum(np.logical_and(s_true[selected], t_true[selected])) |
|
print(f'moving invariant Low/High:\t{true_num}/{moving_sample_num}') |
|
|
|
return true_num, moving_sample_num |
|
|
|
|
|
def eval_moving_invariants_test(self, e_s, e_t, resolution=500): |
|
test_data_s = self.data_provider.test_representation(e_s) |
|
test_data_t = self.data_provider.test_representation(e_t) |
|
|
|
pred_s = self.data_provider.get_pred(e_s, test_data_s) |
|
pred_t = self.data_provider.get_pred(e_t, test_data_t) |
|
|
|
low_s = self.projector.batch_project(e_s, test_data_s) |
|
low_t = self.projector.batch_project(e_t, test_data_t) |
|
|
|
s_B = is_B(pred_s) |
|
t_B = is_B(pred_t) |
|
|
|
predictions_s = pred_s.argmax(1) |
|
predictions_t = pred_t.argmax(1) |
|
|
|
confident_sample = np.logical_and(np.logical_not(s_B),np.logical_not(t_B)) |
|
diff_pred = predictions_s!=predictions_t |
|
|
|
selected = np.logical_and(diff_pred, confident_sample) |
|
|
|
|
|
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10') |
|
grid_view_s, _ = vis.get_epoch_decision_view(e_s, resolution) |
|
grid_view_t, _ = vis.get_epoch_decision_view(e_t, resolution) |
|
|
|
grid_view_s = grid_view_s.reshape(resolution*resolution, -1) |
|
grid_view_t = grid_view_t.reshape(resolution*resolution, -1) |
|
|
|
grid_samples_s = self.projector.batch_inverse(e_s, grid_view_s) |
|
grid_samples_t = self.projector.batch_inverse(e_t, grid_view_t) |
|
|
|
grid_pred_s = self.data_provider.get_pred(e_s, grid_samples_s)+1e-8 |
|
grid_pred_t = self.data_provider.get_pred(e_t, grid_samples_t)+1e-8 |
|
|
|
grid_s_B = is_B(grid_pred_s) |
|
grid_t_B = is_B(grid_pred_t) |
|
|
|
grid_predictions_s = grid_pred_s.argmax(1) |
|
grid_predictions_t = grid_pred_t.argmax(1) |
|
|
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_s) |
|
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True) |
|
|
|
close_s_pred = grid_predictions_s[knn_indices].squeeze() |
|
close_s_B = grid_s_B[knn_indices].squeeze() |
|
s_true = np.logical_and(close_s_pred==predictions_s, close_s_B == s_B) |
|
|
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_t) |
|
_, knn_indices = high_neigh.kneighbors(low_t, n_neighbors=1, return_distance=True) |
|
|
|
close_t_pred = grid_predictions_t[knn_indices].squeeze() |
|
close_t_B = grid_t_B[knn_indices].squeeze() |
|
t_true = np.logical_and(close_t_pred==predictions_t, close_t_B == t_B) |
|
|
|
moving_sample_num = np.sum(selected) |
|
true_num = np.sum(np.logical_and(s_true[selected], t_true[selected])) |
|
print(f'moving invariant Low/High:\t{true_num}/{moving_sample_num}') |
|
|
|
return true_num, moving_sample_num |
|
|
|
def eval_fixing_invariants_train(self, e_s, e_t, high_threshold, low_threshold, metric="euclidean"): |
|
train_data_s = self.data_provider.train_representation(e_s) |
|
train_data_t = self.data_provider.train_representation(e_t) |
|
|
|
|
|
pred_s = self.data_provider.get_pred(e_s, train_data_s) |
|
pred_t = self.data_provider.get_pred(e_t, train_data_t) |
|
softmax_s = softmax(pred_s, axis=1) |
|
softmax_t = softmax(pred_t, axis=1) |
|
|
|
low_s = self.projector.batch_project(e_s, train_data_s) |
|
low_t = self.projector.batch_project(e_t, train_data_t) |
|
|
|
|
|
y_max = max(low_s[:, 1].max(), low_t[:, 1].max()) |
|
y_min = max(low_s[:, 1].min(), low_t[:, 1].min()) |
|
x_max = max(low_s[:, 0].max(), low_t[:, 0].max()) |
|
x_min = max(low_s[:, 0].min(), low_t[:, 0].min()) |
|
scale = min(100/(x_max - x_min), 100/(y_max - y_min)) |
|
low_t = low_t*scale |
|
low_s = low_s*scale |
|
|
|
if metric == "euclidean": |
|
high_dists = np.linalg.norm(train_data_s-train_data_t, axis=1) |
|
elif metric == "cosine": |
|
high_dists = np.array([cosine(low_t[i], low_s[i]) for i in range(len(low_s))]) |
|
elif metric == "softmax": |
|
high_dists = np.array([js_div(softmax_s[i], softmax_t[i]) for i in range(len(softmax_t))]) |
|
low_dists = np.linalg.norm(low_s-low_t, axis=1) |
|
|
|
selected = high_dists<=high_threshold |
|
|
|
return np.sum(np.logical_and(selected, low_dists<=low_threshold)), np.sum(selected) |
|
|
|
def eval_fixing_invariants_test(self, e_s, e_t, high_threshold, low_threshold, metric="euclidean"): |
|
test_data_s = self.data_provider.test_representation(e_s) |
|
test_data_t = self.data_provider.test_representation(e_t) |
|
|
|
|
|
pred_s = self.data_provider.get_pred(e_s, test_data_s) |
|
pred_t = self.data_provider.get_pred(e_t, test_data_t) |
|
softmax_s = softmax(pred_s, axis=1) |
|
softmax_t = softmax(pred_t, axis=1) |
|
|
|
low_s = self.projector.batch_project(e_s, test_data_s) |
|
low_t = self.projector.batch_project(e_t, test_data_t) |
|
|
|
|
|
y_max = max(low_s[:, 1].max(), low_t[:, 1].max()) |
|
y_min = max(low_s[:, 1].min(), low_t[:, 1].min()) |
|
x_max = max(low_s[:, 0].max(), low_t[:, 0].max()) |
|
x_min = max(low_s[:, 0].min(), low_t[:, 0].min()) |
|
scale = min(100/(x_max - x_min), 100/(y_max - y_min)) |
|
low_t = low_t*scale |
|
low_s = low_s*scale |
|
|
|
if metric == "euclidean": |
|
high_dists = np.linalg.norm(test_data_s-test_data_t, axis=1) |
|
elif metric == "cosine": |
|
high_dists = np.array([cosine(low_t[i], low_s[i]) for i in range(len(low_s))]) |
|
elif metric == "softmax": |
|
high_dists = np.array([js_div(softmax_s[i], softmax_t[i]) for i in range(len(softmax_t))]) |
|
low_dists = np.linalg.norm(low_s-low_t, axis=1) |
|
|
|
selected = high_dists<=high_threshold |
|
|
|
return np.sum(np.logical_and(selected, low_dists<=low_threshold)), np.sum(selected) |
|
|
|
def eval_proj_invariants_train(self, e, resolution=500): |
|
train_data = self.data_provider.train_representation(e) |
|
pred_s = self.data_provider.get_pred(e, train_data) |
|
low_s = self.projector.batch_project(e, train_data) |
|
s_B = is_B(pred_s) |
|
predictions_s = pred_s.argmax(1) |
|
|
|
|
|
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10') |
|
grid_view_s, _ = vis.get_epoch_decision_view(e, resolution) |
|
grid_view_s = grid_view_s.reshape(resolution*resolution, -1) |
|
grid_samples_s = self.projector.batch_inverse(e, grid_view_s) |
|
grid_pred_s = self.data_provider.get_pred(e, grid_samples_s)+1e-8 |
|
grid_s_B = is_B(grid_pred_s) |
|
grid_predictions_s = grid_pred_s.argmax(1) |
|
|
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_s) |
|
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True) |
|
|
|
close_s_pred = grid_predictions_s[knn_indices].squeeze() |
|
close_s_B = grid_s_B[knn_indices].squeeze() |
|
|
|
border_true = np.logical_and(s_B, close_s_B) |
|
pred_true = np.logical_and(close_s_pred==predictions_s, np.logical_not(s_B)) |
|
|
|
print("border fixing invariants:\t{}/{}".format(np.sum(border_true), np.sum(s_B))) |
|
print("prediction fixing invariants:\t{}/{}".format(np.sum(pred_true), np.sum(np.logical_not(s_B)))) |
|
print("invariants:\t{}/{}".format(np.sum(border_true)+np.sum(pred_true), len(train_data))) |
|
|
|
|
|
return np.sum(border_true), np.sum(pred_true), len(train_data) |
|
|
|
def eval_proj_invariants_test(self, e, resolution=500): |
|
test_data = self.data_provider.test_representation(e) |
|
pred_s = self.data_provider.get_pred(e, test_data) |
|
low_s = self.projector.batch_project(e, test_data) |
|
s_B = is_B(pred_s) |
|
predictions_s = pred_s.argmax(1) |
|
|
|
|
|
vis = visualizer(self.data_provider, self.projector, resolution, cmap='tab10') |
|
grid_view_s, _ = vis.get_epoch_decision_view(e, resolution) |
|
grid_view_s = grid_view_s.reshape(resolution*resolution, -1) |
|
grid_samples_s = self.projector.batch_inverse(e, grid_view_s) |
|
grid_pred_s = self.data_provider.get_pred(e, grid_samples_s)+1e-8 |
|
grid_s_B = is_B(grid_pred_s) |
|
grid_predictions_s = grid_pred_s.argmax(1) |
|
|
|
|
|
high_neigh = NearestNeighbors(n_neighbors=1, radius=0.4) |
|
high_neigh.fit(grid_view_s) |
|
_, knn_indices = high_neigh.kneighbors(low_s, n_neighbors=1, return_distance=True) |
|
|
|
close_s_pred = grid_predictions_s[knn_indices].squeeze() |
|
close_s_B = grid_s_B[knn_indices].squeeze() |
|
|
|
border_true = np.logical_and(s_B, close_s_B) |
|
pred_true = np.logical_and(close_s_pred==predictions_s, np.logical_not(s_B)) |
|
|
|
print("border fixing invariants:\t{}/{}".format(np.sum(border_true), np.sum(s_B))) |
|
print("prediction fixing invariants:\t{}/{}".format(np.sum(pred_true), np.sum(np.logical_not(s_B)))) |
|
print("invariants:\t{}/{}".format(np.sum(border_true)+np.sum(pred_true), len(test_data))) |
|
|
|
return np.sum(border_true), np.sum(pred_true), len(test_data) |
|
|
|
def train_acc(self, epoch): |
|
data = self.data_provider.train_representation(epoch) |
|
labels = self.data_provider.train_labels(epoch) |
|
pred = self.data_provider.get_pred(epoch, data).argmax(1) |
|
return np.sum(labels==pred)/len(labels) |
|
|
|
def test_acc(self, epoch): |
|
data = self.data_provider.test_representation(epoch) |
|
labels = self.data_provider.test_labels(epoch) |
|
pred = self.data_provider.get_pred(epoch, data).argmax(1) |
|
return np.sum(labels==pred)/len(labels) |
|
|
|
|
|
|
|
def save_epoch_eval(self, n_epoch, n_neighbors, temporal_k=5, file_name="evaluation"): |
|
|
|
save_dir = os.path.join(self.data_provider.model_path) |
|
save_file = os.path.join(save_dir, file_name + ".json") |
|
if not os.path.exists(save_file): |
|
evaluation = dict() |
|
else: |
|
f = open(save_file, "r") |
|
evaluation = json.load(f) |
|
f.close() |
|
n_key = str(n_neighbors) |
|
|
|
if "train_acc" not in evaluation.keys(): |
|
evaluation["train_acc"] = dict() |
|
if "test_acc" not in evaluation.keys(): |
|
evaluation["test_acc"] = dict() |
|
if "nn_train" not in evaluation: |
|
evaluation["nn_train"] = dict() |
|
if "nn_test" not in evaluation: |
|
evaluation["nn_test"] = dict() |
|
if "b_train" not in evaluation: |
|
evaluation["b_train"] = dict() |
|
if "b_test" not in evaluation: |
|
evaluation["b_test"] = dict() |
|
if "ppr_train" not in evaluation.keys(): |
|
evaluation["ppr_train"] = dict() |
|
if "ppr_test" not in evaluation.keys(): |
|
evaluation["ppr_test"] = dict() |
|
if "ppr_dist_train" not in evaluation.keys(): |
|
evaluation["ppr_dist_train"] = dict() |
|
if "ppr_dist_test" not in evaluation.keys(): |
|
evaluation["ppr_dist_test"] = dict() |
|
if "tnn_train" not in evaluation.keys(): |
|
evaluation["tnn_train"] = dict() |
|
if "tnn_test" not in evaluation.keys(): |
|
evaluation["tnn_test"] = dict() |
|
if "tr_train" not in evaluation.keys(): |
|
evaluation["tr_train"] = dict() |
|
if "tr_test" not in evaluation.keys(): |
|
evaluation["tr_test"] = dict() |
|
if "wtr_train" not in evaluation.keys(): |
|
evaluation["wtr_train"] = dict() |
|
if "wtr_test" not in evaluation.keys(): |
|
evaluation["wtr_test"] = dict() |
|
if "tlr_train" not in evaluation.keys(): |
|
evaluation["tlr_train"] = dict() |
|
if "tlr_test" not in evaluation.keys(): |
|
evaluation["tlr_test"] = dict() |
|
|
|
if "temporal_train_mean" not in evaluation.keys(): |
|
evaluation["temporal_train_mean"] = dict() |
|
if "temporal_test_mean" not in evaluation.keys(): |
|
evaluation["temporal_test_mean"] = dict() |
|
|
|
epoch_key = str(n_epoch) |
|
if epoch_key not in evaluation["nn_train"]: |
|
evaluation["nn_train"][epoch_key] = dict() |
|
evaluation["nn_train"][epoch_key][n_key] = self.eval_nn_train(n_epoch, n_neighbors) |
|
if epoch_key not in evaluation["nn_test"]: |
|
evaluation["nn_test"][epoch_key] = dict() |
|
evaluation["nn_test"][epoch_key][n_key] = self.eval_nn_test(n_epoch, n_neighbors) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if epoch_key not in evaluation["ppr_train"]: |
|
evaluation["ppr_train"][epoch_key] = dict() |
|
evaluation["ppr_train"][epoch_key] = self.eval_inv_train(n_epoch) |
|
if epoch_key not in evaluation["ppr_test"]: |
|
evaluation["ppr_test"][epoch_key] = dict() |
|
evaluation["ppr_test"][epoch_key] = self.eval_inv_test(n_epoch) |
|
|
|
|
|
|
|
|
|
evaluation["train_acc"][epoch_key] = self.train_acc(n_epoch) |
|
evaluation["test_acc"][epoch_key] = self.test_acc(n_epoch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(save_file, "w") as f: |
|
json.dump(evaluation, f) |
|
if self.verbose: |
|
print("Successfully save evaluation with {:d} neighbors...".format(n_neighbors)) |
|
|
|
def get_eval(self, file_name="evaluation"): |
|
save_dir = os.path.join(self.data_provider.model_path, file_name + ".json") |
|
f = open(save_dir, "r") |
|
evaluation = json.load(f) |
|
f.close() |
|
return evaluation |
|
|
|
|
|
class SegEvaluator(Evaluator): |
|
def __init__(self, data_provider, projector, exp, verbose=1): |
|
super().__init__(data_provider, projector, verbose) |
|
self.exp = exp |
|
|
|
def save_epoch_eval(self, n_epoch, n_neighbors, temporal_k=5, file_name="evaluation"): |
|
|
|
save_dir = os.path.join(self.data_provider.model_path, "{}".format(self.exp)) |
|
save_file = os.path.join(save_dir, file_name + ".json") |
|
if not os.path.exists(save_file): |
|
evaluation = dict() |
|
else: |
|
f = open(save_file, "r") |
|
evaluation = json.load(f) |
|
f.close() |
|
n_key = str(n_neighbors) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if "nn_train" not in evaluation: |
|
evaluation["nn_train"] = dict() |
|
if "nn_test" not in evaluation: |
|
evaluation["nn_test"] = dict() |
|
|
|
|
|
|
|
|
|
if "ppr_train" not in evaluation.keys(): |
|
evaluation["ppr_train"] = dict() |
|
if "ppr_test" not in evaluation.keys(): |
|
evaluation["ppr_test"] = dict() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "tlr_train" not in evaluation.keys(): |
|
evaluation["tlr_train"] = dict() |
|
if "tlr_test" not in evaluation.keys(): |
|
evaluation["tlr_test"] = dict() |
|
|
|
epoch_key = str(n_epoch) |
|
if epoch_key not in evaluation["nn_train"]: |
|
evaluation["nn_train"][epoch_key] = dict() |
|
evaluation["nn_train"][epoch_key][n_key] = self.eval_nn_train(n_epoch, n_neighbors) |
|
if epoch_key not in evaluation["nn_test"]: |
|
evaluation["nn_test"][epoch_key] = dict() |
|
evaluation["nn_test"][epoch_key][n_key] = self.eval_nn_test(n_epoch, n_neighbors) |
|
|
|
|
|
|
|
|
|
|
|
|
|
evaluation["ppr_train"][epoch_key] = self.eval_inv_train(n_epoch) |
|
evaluation["ppr_test"][epoch_key] = self.eval_inv_test(n_epoch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
evaluation["tlr_train"][epoch_key] = self.eval_temporal_local_corr_train(n_epoch, 3) |
|
evaluation["tlr_test"][epoch_key] = self.eval_temporal_local_corr_test(n_epoch, 3) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(save_file, "w") as f: |
|
json.dump(evaluation, f) |
|
if self.verbose: |
|
print("Successfully save evaluation with {:d} neighbors...".format(n_neighbors)) |
|
|
|
def get_eval(self, file_name="evaluation"): |
|
save_dir = os.path.join(self.data_provider.model_path, "{}".format(self.exp), file_name + ".json") |
|
f = open(save_dir, "r") |
|
evaluation = json.load(f) |
|
f.close() |
|
return evaluation |
|
|
|
class ALEvaluator(Evaluator): |
|
def __init__(self, data_provider, projector, verbose=1): |
|
super().__init__(data_provider, projector, verbose) |
|
|
|
def train_acc(self, epoch): |
|
data = self.data_provider.train_representation(epoch) |
|
labels = self.data_provider.train_labels(epoch) |
|
pred = self.data_provider.get_pred(epoch, data).argmax(1) |
|
return np.sum(labels==pred)/len(labels) |
|
|
|
|
|
|
|
def save_epoch_eval(self, n_epoch, file_name="evaluation"): |
|
|
|
save_dir = os.path.join(self.data_provider.model_path) |
|
save_file = os.path.join(save_dir, file_name + ".json") |
|
if not os.path.exists(save_file): |
|
evaluation = dict() |
|
else: |
|
f = open(save_file, "r") |
|
evaluation = json.load(f) |
|
f.close() |
|
if "train_acc" not in evaluation.keys(): |
|
evaluation["train_acc"] = dict() |
|
if "test_acc" not in evaluation.keys(): |
|
evaluation["test_acc"] = dict() |
|
epoch_key = str(n_epoch) |
|
|
|
evaluation["train_acc"][epoch_key] = self.train_acc(n_epoch) |
|
evaluation["test_acc"][epoch_key] = self.test_acc(n_epoch) |
|
|
|
with open(save_file, "w") as f: |
|
json.dump(evaluation, f) |
|
if self.verbose: |
|
print("Successfully save evaluation for Iteration {}".format(epoch_key)) |
|
|
|
|
|
class DenseALEvaluator(Evaluator): |
|
|
|
def __init__(self, data_provider, projector, verbose=1): |
|
super().__init__(data_provider, projector, verbose) |
|
|