HungNP
New single commit message
cb80c28
import copy
import logging
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader
from utils.toolkit import tensor2numpy, accuracy
from scipy.spatial.distance import cdist
import os
EPSILON = 1e-8
batch_size = 64
class BaseLearner(object):
def __init__(self, args):
self.args = args
self._cur_task = -1
self._known_classes = 0
self._total_classes = 0
self.class_list = []
self._network = None
self._old_network = None
self._data_memory, self._targets_memory = np.array([]), np.array([])
self.topk = 5
self._memory_size = args["memory_size"]
self._memory_per_class = args.get("memory_per_class", None)
self._fixed_memory = args.get("fixed_memory", False)
self._device = args["device"][0]
self._multiple_gpus = args["device"]
@property
def exemplar_size(self):
assert len(self._data_memory) == len(
self._targets_memory
), "Exemplar size error."
return len(self._targets_memory)
@property
def samples_per_class(self):
if self._fixed_memory:
return self._memory_per_class
else:
assert self._total_classes != 0, "Total classes is 0"
return self._memory_size // self._total_classes
@property
def feature_dim(self):
if isinstance(self._network, nn.DataParallel):
return self._network.module.feature_dim
else:
return self._network.feature_dim
def build_rehearsal_memory(self, data_manager, per_class, ):
if self._fixed_memory:
self._construct_exemplar_unified(data_manager, per_class)
else:
self._reduce_exemplar(data_manager, per_class)
self._construct_exemplar(data_manager, per_class)
def load_checkpoint(self, filename):
pass;
def save_checkpoint(self, filename):
self._network.cpu()
save_dict = {
"tasks": self._cur_task,
"model_state_dict": self._network.state_dict(),
}
torch.save(save_dict, "./{}/{}_{}.pkl".format(filename, self.args['model_name'], self._cur_task))
def after_task(self):
pass
def _evaluate(self, y_pred, y_true, group = 10):
ret = {}
grouped = accuracy(y_pred.T[0], y_true, self._known_classes, increment = group)
ret["grouped"] = grouped
ret["top1"] = grouped["total"]
ret["top{}".format(self.topk)] = np.around(
(y_pred.T == np.tile(y_true, (self.topk, 1))).sum() * 100 / len(y_true),
decimals=2,
)
return ret
def eval_task(self, data=None, save_conf=False, group = 10, mode = "train"):
if data is None:
data = self.test_loader
y_pred, y_true = self._eval_cnn(data, mode = mode)
cnn_accy = self._evaluate(y_pred, y_true, group = group)
if hasattr(self, "_class_means"):
y_pred, y_true = self._eval_nme(data, self._class_means)
nme_accy = self._evaluate(y_pred, y_true)
else:
nme_accy = None
if save_conf:
_pred = y_pred.T[0]
_pred_path = os.path.join(self.args['logfilename'], "pred.npy")
_target_path = os.path.join(self.args['logfilename'], "target.npy")
np.save(_pred_path, _pred)
np.save(_target_path, y_true)
_save_dir = os.path.join(f"./results/{self.args['model_name']}/conf_matrix/{self.args['prefix']}")
os.makedirs(_save_dir, exist_ok=True)
_save_path = os.path.join(_save_dir, f"{self.args['csv_name']}.csv")
with open(_save_path, "a+") as f:
f.write(f"{self.args['model_name']},{_pred_path},{_target_path} \n")
return cnn_accy, nme_accy
def incremental_train(self):
pass
def _train(self):
pass
def _get_memory(self):
if len(self._data_memory) == 0:
return None
else:
return (self._data_memory, self._targets_memory)
def _compute_accuracy(self, model, loader):
model.eval()
correct, total = 0, 0
for i, (_, inputs, targets) in enumerate(loader):
inputs = inputs.to(self._device)
with torch.no_grad():
outputs = model(inputs)["logits"]
predicts = torch.max(outputs, dim=1)[1]
correct += (predicts.cpu() == targets).sum()
total += len(targets)
return np.around(tensor2numpy(correct) * 100 / total, decimals=2)
def _eval_cnn(self, loader, mode = "train"):
self._network.eval()
y_pred, y_true = [], []
for _, (_, inputs, targets) in enumerate(loader):
inputs = inputs.to(self._device)
with torch.no_grad():
outputs = self._network(inputs)["logits"]
if self.topk > self._total_classes:
self.topk = self._total_classes
predicts = torch.topk(
outputs, k=self.topk, dim=1, largest=True, sorted=True
)[
1
] # [bs, topk]
refine_predicts = predicts.cpu().numpy()
if mode == "test":
refine_predicts = self.class_list[refine_predicts]
y_pred.append(refine_predicts)
y_true.append(targets.cpu().numpy())
return np.concatenate(y_pred), np.concatenate(y_true) # [N, topk]
def inference(self, image):
self._network.eval()
self._network.to(self._device)
image = image.to(self._device, dtype=torch.float32)
with torch.no_grad():
output = self._network(image)["logits"]
if self.topk > self._total_classes:
self.topk = self._total_classes
predict = torch.topk(
output, k=self.topk, dim=1, largest=True, sorted=True
)[1]
confidents = softmax(output.cpu().numpy())
if self.class_list is not None:
self.class_list = np.array(self.class_list)
predicts = predict.cpu().numpy()
result = self.class_list[predicts].tolist()
#result = predicts.tolist()
result.append([self.label_list[item] for item in result[0]])
result.append(confidents[0][predicts][0].tolist())
return result
elif self.data_manager is not None:
return self.data_manager.class_list[predict.cpu().numpy()]
predicts.append([self.label_list[index] for index in predicts[0]])
return predicts
def _eval_nme(self, loader, class_means):
self._network.eval()
vectors, y_true = self._extract_vectors(loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
dists = cdist(class_means, vectors, "sqeuclidean") # [nb_classes, N]
scores = dists.T # [N, nb_classes], choose the one with the smallest distance
return np.argsort(scores, axis=1)[:, : self.topk], y_true # [N, topk]
def _extract_vectors(self, loader):
self._network.eval()
vectors, targets = [], []
for _, _inputs, _targets in loader:
_targets = _targets.numpy()
if isinstance(self._network, nn.DataParallel):
_vectors = tensor2numpy(
self._network.module.extract_vector(_inputs.to(self._device))
)
else:
_vectors = tensor2numpy(
self._network.extract_vector(_inputs.to(self._device))
)
vectors.append(_vectors)
targets.append(_targets)
return np.concatenate(vectors), np.concatenate(targets)
def _reduce_exemplar(self, data_manager, m):
logging.info("Reducing exemplars...({} per classes)".format(m))
dummy_data, dummy_targets = copy.deepcopy(self._data_memory), copy.deepcopy(
self._targets_memory
)
self._class_means = np.zeros((self._total_classes, self.feature_dim))
self._data_memory, self._targets_memory = np.array([]), np.array([])
for class_idx in range(self._known_classes):
mask = np.where(dummy_targets == class_idx)[0]
dd, dt = dummy_data[mask][:m], dummy_targets[mask][:m]
self._data_memory = (
np.concatenate((self._data_memory, dd))
if len(self._data_memory) != 0
else dd
)
self._targets_memory = (
np.concatenate((self._targets_memory, dt))
if len(self._targets_memory) != 0
else dt
)
# Exemplar mean
idx_dataset = data_manager.get_dataset(
[], source="train", mode="test", appendent=(dd, dt)
)
idx_loader = DataLoader(
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(idx_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
mean = np.mean(vectors, axis=0)
mean = mean / np.linalg.norm(mean)
self._class_means[class_idx, :] = mean
def _construct_exemplar(self, data_manager, m):
logging.info("Constructing exemplars...({} per classes)".format(m))
for class_idx in range(self._known_classes, self._total_classes):
data, targets, idx_dataset = data_manager.get_dataset(
np.arange(class_idx, class_idx + 1),
source="train",
mode="test",
ret_data=True,
)
idx_loader = DataLoader(
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(idx_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
class_mean = np.mean(vectors, axis=0)
# Select
selected_exemplars = []
exemplar_vectors = [] # [n, feature_dim]
for k in range(1, m + 1):
S = np.sum(
exemplar_vectors, axis=0
) # [feature_dim] sum of selected exemplars vectors
mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors
i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1)))
selected_exemplars.append(
np.array(data[i])
) # New object to avoid passing by inference
exemplar_vectors.append(
np.array(vectors[i])
) # New object to avoid passing by inference
vectors = np.delete(
vectors, i, axis=0
) # Remove it to avoid duplicative selection
data = np.delete(
data, i, axis=0
) # Remove it to avoid duplicative selection
# uniques = np.unique(selected_exemplars, axis=0)
# print('Unique elements: {}'.format(len(uniques)))
selected_exemplars = np.array(selected_exemplars)
exemplar_targets = np.full(m, class_idx)
self._data_memory = (
np.concatenate((self._data_memory, selected_exemplars))
if len(self._data_memory) != 0
else selected_exemplars
)
self._targets_memory = (
np.concatenate((self._targets_memory, exemplar_targets))
if len(self._targets_memory) != 0
else exemplar_targets
)
# Exemplar mean
idx_dataset = data_manager.get_dataset(
[],
source="train",
mode="test",
appendent=(selected_exemplars, exemplar_targets),
)
idx_loader = DataLoader(
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(idx_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
mean = np.mean(vectors, axis=0)
mean = mean / np.linalg.norm(mean)
self._class_means[class_idx, :] = mean
def _construct_exemplar_unified(self, data_manager, m):
logging.info(
"Constructing exemplars for new classes...({} per classes)".format(m)
)
_class_means = np.zeros((self._total_classes, self.feature_dim))
# Calculate the means of old classes with newly trained network
for class_idx in range(self._known_classes):
mask = np.where(self._targets_memory == class_idx)[0]
class_data, class_targets = (
self._data_memory[mask],
self._targets_memory[mask],
)
class_dset = data_manager.get_dataset(
[], source="train", mode="test", appendent=(class_data, class_targets)
)
class_loader = DataLoader(
class_dset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(class_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
mean = np.mean(vectors, axis=0)
mean = mean / np.linalg.norm(mean)
_class_means[class_idx, :] = mean
# Construct exemplars for new classes and calculate the means
for class_idx in range(self._known_classes, self._total_classes):
data, targets, class_dset = data_manager.get_dataset(
np.arange(class_idx, class_idx + 1),
source="train",
mode="test",
ret_data=True,
)
class_loader = DataLoader(
class_dset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(class_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
class_mean = np.mean(vectors, axis=0)
# Select
selected_exemplars = []
exemplar_vectors = []
for k in range(1, m + 1):
S = np.sum(
exemplar_vectors, axis=0
) # [feature_dim] sum of selected exemplars vectors
mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors
i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1)))
selected_exemplars.append(
np.array(data[i])
) # New object to avoid passing by inference
exemplar_vectors.append(
np.array(vectors[i])
) # New object to avoid passing by inference
vectors = np.delete(
vectors, i, axis=0
) # Remove it to avoid duplicative selection
data = np.delete(
data, i, axis=0
) # Remove it to avoid duplicative selection
selected_exemplars = np.array(selected_exemplars)
exemplar_targets = np.full(m, class_idx)
self._data_memory = (
np.concatenate((self._data_memory, selected_exemplars))
if len(self._data_memory) != 0
else selected_exemplars
)
self._targets_memory = (
np.concatenate((self._targets_memory, exemplar_targets))
if len(self._targets_memory) != 0
else exemplar_targets
)
# Exemplar mean
exemplar_dset = data_manager.get_dataset(
[],
source="train",
mode="test",
appendent=(selected_exemplars, exemplar_targets),
)
exemplar_loader = DataLoader(
exemplar_dset, batch_size=batch_size, shuffle=False, num_workers=4
)
vectors, _ = self._extract_vectors(exemplar_loader)
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T
mean = np.mean(vectors, axis=0)
mean = mean / np.linalg.norm(mean)
_class_means[class_idx, :] = mean
self._class_means = _class_means
def softmax(x):
"""Compute softmax values for each sets of scores in x."""
e_x = np.exp(x - np.max(x))
return e_x / (e_x.sum(axis=0) + 1e-7) # only difference