Spaces:
Runtime error
Runtime error
import copy | |
import logging | |
import numpy as np | |
import torch | |
from torch import nn | |
from torch.utils.data import DataLoader | |
from utils.toolkit import tensor2numpy, accuracy | |
from scipy.spatial.distance import cdist | |
import os | |
EPSILON = 1e-8 | |
batch_size = 64 | |
class BaseLearner(object): | |
def __init__(self, args): | |
self.args = args | |
self._cur_task = -1 | |
self._known_classes = 0 | |
self._total_classes = 0 | |
self.class_list = [] | |
self._network = None | |
self._old_network = None | |
self._data_memory, self._targets_memory = np.array([]), np.array([]) | |
self.topk = 5 | |
self._memory_size = args["memory_size"] | |
self._memory_per_class = args.get("memory_per_class", None) | |
self._fixed_memory = args.get("fixed_memory", False) | |
self._device = args["device"][0] | |
self._multiple_gpus = args["device"] | |
def exemplar_size(self): | |
assert len(self._data_memory) == len( | |
self._targets_memory | |
), "Exemplar size error." | |
return len(self._targets_memory) | |
def samples_per_class(self): | |
if self._fixed_memory: | |
return self._memory_per_class | |
else: | |
assert self._total_classes != 0, "Total classes is 0" | |
return self._memory_size // self._total_classes | |
def feature_dim(self): | |
if isinstance(self._network, nn.DataParallel): | |
return self._network.module.feature_dim | |
else: | |
return self._network.feature_dim | |
def build_rehearsal_memory(self, data_manager, per_class, ): | |
if self._fixed_memory: | |
self._construct_exemplar_unified(data_manager, per_class) | |
else: | |
self._reduce_exemplar(data_manager, per_class) | |
self._construct_exemplar(data_manager, per_class) | |
def load_checkpoint(self, filename): | |
pass; | |
def save_checkpoint(self, filename): | |
self._network.cpu() | |
save_dict = { | |
"tasks": self._cur_task, | |
"model_state_dict": self._network.state_dict(), | |
} | |
torch.save(save_dict, "./{}/{}_{}.pkl".format(filename, self.args['model_name'], self._cur_task)) | |
def after_task(self): | |
pass | |
def _evaluate(self, y_pred, y_true, group = 10): | |
ret = {} | |
grouped = accuracy(y_pred.T[0], y_true, self._known_classes, increment = group) | |
ret["grouped"] = grouped | |
ret["top1"] = grouped["total"] | |
ret["top{}".format(self.topk)] = np.around( | |
(y_pred.T == np.tile(y_true, (self.topk, 1))).sum() * 100 / len(y_true), | |
decimals=2, | |
) | |
return ret | |
def eval_task(self, data=None, save_conf=False, group = 10, mode = "train"): | |
if data is None: | |
data = self.test_loader | |
y_pred, y_true = self._eval_cnn(data, mode = mode) | |
cnn_accy = self._evaluate(y_pred, y_true, group = group) | |
if hasattr(self, "_class_means"): | |
y_pred, y_true = self._eval_nme(data, self._class_means) | |
nme_accy = self._evaluate(y_pred, y_true) | |
else: | |
nme_accy = None | |
if save_conf: | |
_pred = y_pred.T[0] | |
_pred_path = os.path.join(self.args['logfilename'], "pred.npy") | |
_target_path = os.path.join(self.args['logfilename'], "target.npy") | |
np.save(_pred_path, _pred) | |
np.save(_target_path, y_true) | |
_save_dir = os.path.join(f"./results/{self.args['model_name']}/conf_matrix/{self.args['prefix']}") | |
os.makedirs(_save_dir, exist_ok=True) | |
_save_path = os.path.join(_save_dir, f"{self.args['csv_name']}.csv") | |
with open(_save_path, "a+") as f: | |
f.write(f"{self.args['model_name']},{_pred_path},{_target_path} \n") | |
return cnn_accy, nme_accy | |
def incremental_train(self): | |
pass | |
def _train(self): | |
pass | |
def _get_memory(self): | |
if len(self._data_memory) == 0: | |
return None | |
else: | |
return (self._data_memory, self._targets_memory) | |
def _compute_accuracy(self, model, loader): | |
model.eval() | |
correct, total = 0, 0 | |
for i, (_, inputs, targets) in enumerate(loader): | |
inputs = inputs.to(self._device) | |
with torch.no_grad(): | |
outputs = model(inputs)["logits"] | |
predicts = torch.max(outputs, dim=1)[1] | |
correct += (predicts.cpu() == targets).sum() | |
total += len(targets) | |
return np.around(tensor2numpy(correct) * 100 / total, decimals=2) | |
def _eval_cnn(self, loader, mode = "train"): | |
self._network.eval() | |
y_pred, y_true = [], [] | |
for _, (_, inputs, targets) in enumerate(loader): | |
inputs = inputs.to(self._device) | |
with torch.no_grad(): | |
outputs = self._network(inputs)["logits"] | |
if self.topk > self._total_classes: | |
self.topk = self._total_classes | |
predicts = torch.topk( | |
outputs, k=self.topk, dim=1, largest=True, sorted=True | |
)[ | |
1 | |
] # [bs, topk] | |
refine_predicts = predicts.cpu().numpy() | |
if mode == "test": | |
refine_predicts = self.class_list[refine_predicts] | |
y_pred.append(refine_predicts) | |
y_true.append(targets.cpu().numpy()) | |
return np.concatenate(y_pred), np.concatenate(y_true) # [N, topk] | |
def inference(self, image): | |
self._network.eval() | |
self._network.to(self._device) | |
image = image.to(self._device, dtype=torch.float32) | |
with torch.no_grad(): | |
output = self._network(image)["logits"] | |
if self.topk > self._total_classes: | |
self.topk = self._total_classes | |
predict = torch.topk( | |
output, k=self.topk, dim=1, largest=True, sorted=True | |
)[1] | |
confidents = softmax(output.cpu().numpy()) | |
if self.class_list is not None: | |
self.class_list = np.array(self.class_list) | |
predicts = predict.cpu().numpy() | |
result = self.class_list[predicts].tolist() | |
#result = predicts.tolist() | |
result.append([self.label_list[item] for item in result[0]]) | |
result.append(confidents[0][predicts][0].tolist()) | |
return result | |
elif self.data_manager is not None: | |
return self.data_manager.class_list[predict.cpu().numpy()] | |
predicts.append([self.label_list[index] for index in predicts[0]]) | |
return predicts | |
def _eval_nme(self, loader, class_means): | |
self._network.eval() | |
vectors, y_true = self._extract_vectors(loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
dists = cdist(class_means, vectors, "sqeuclidean") # [nb_classes, N] | |
scores = dists.T # [N, nb_classes], choose the one with the smallest distance | |
return np.argsort(scores, axis=1)[:, : self.topk], y_true # [N, topk] | |
def _extract_vectors(self, loader): | |
self._network.eval() | |
vectors, targets = [], [] | |
for _, _inputs, _targets in loader: | |
_targets = _targets.numpy() | |
if isinstance(self._network, nn.DataParallel): | |
_vectors = tensor2numpy( | |
self._network.module.extract_vector(_inputs.to(self._device)) | |
) | |
else: | |
_vectors = tensor2numpy( | |
self._network.extract_vector(_inputs.to(self._device)) | |
) | |
vectors.append(_vectors) | |
targets.append(_targets) | |
return np.concatenate(vectors), np.concatenate(targets) | |
def _reduce_exemplar(self, data_manager, m): | |
logging.info("Reducing exemplars...({} per classes)".format(m)) | |
dummy_data, dummy_targets = copy.deepcopy(self._data_memory), copy.deepcopy( | |
self._targets_memory | |
) | |
self._class_means = np.zeros((self._total_classes, self.feature_dim)) | |
self._data_memory, self._targets_memory = np.array([]), np.array([]) | |
for class_idx in range(self._known_classes): | |
mask = np.where(dummy_targets == class_idx)[0] | |
dd, dt = dummy_data[mask][:m], dummy_targets[mask][:m] | |
self._data_memory = ( | |
np.concatenate((self._data_memory, dd)) | |
if len(self._data_memory) != 0 | |
else dd | |
) | |
self._targets_memory = ( | |
np.concatenate((self._targets_memory, dt)) | |
if len(self._targets_memory) != 0 | |
else dt | |
) | |
# Exemplar mean | |
idx_dataset = data_manager.get_dataset( | |
[], source="train", mode="test", appendent=(dd, dt) | |
) | |
idx_loader = DataLoader( | |
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(idx_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
mean = np.mean(vectors, axis=0) | |
mean = mean / np.linalg.norm(mean) | |
self._class_means[class_idx, :] = mean | |
def _construct_exemplar(self, data_manager, m): | |
logging.info("Constructing exemplars...({} per classes)".format(m)) | |
for class_idx in range(self._known_classes, self._total_classes): | |
data, targets, idx_dataset = data_manager.get_dataset( | |
np.arange(class_idx, class_idx + 1), | |
source="train", | |
mode="test", | |
ret_data=True, | |
) | |
idx_loader = DataLoader( | |
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(idx_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
class_mean = np.mean(vectors, axis=0) | |
# Select | |
selected_exemplars = [] | |
exemplar_vectors = [] # [n, feature_dim] | |
for k in range(1, m + 1): | |
S = np.sum( | |
exemplar_vectors, axis=0 | |
) # [feature_dim] sum of selected exemplars vectors | |
mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors | |
i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1))) | |
selected_exemplars.append( | |
np.array(data[i]) | |
) # New object to avoid passing by inference | |
exemplar_vectors.append( | |
np.array(vectors[i]) | |
) # New object to avoid passing by inference | |
vectors = np.delete( | |
vectors, i, axis=0 | |
) # Remove it to avoid duplicative selection | |
data = np.delete( | |
data, i, axis=0 | |
) # Remove it to avoid duplicative selection | |
# uniques = np.unique(selected_exemplars, axis=0) | |
# print('Unique elements: {}'.format(len(uniques))) | |
selected_exemplars = np.array(selected_exemplars) | |
exemplar_targets = np.full(m, class_idx) | |
self._data_memory = ( | |
np.concatenate((self._data_memory, selected_exemplars)) | |
if len(self._data_memory) != 0 | |
else selected_exemplars | |
) | |
self._targets_memory = ( | |
np.concatenate((self._targets_memory, exemplar_targets)) | |
if len(self._targets_memory) != 0 | |
else exemplar_targets | |
) | |
# Exemplar mean | |
idx_dataset = data_manager.get_dataset( | |
[], | |
source="train", | |
mode="test", | |
appendent=(selected_exemplars, exemplar_targets), | |
) | |
idx_loader = DataLoader( | |
idx_dataset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(idx_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
mean = np.mean(vectors, axis=0) | |
mean = mean / np.linalg.norm(mean) | |
self._class_means[class_idx, :] = mean | |
def _construct_exemplar_unified(self, data_manager, m): | |
logging.info( | |
"Constructing exemplars for new classes...({} per classes)".format(m) | |
) | |
_class_means = np.zeros((self._total_classes, self.feature_dim)) | |
# Calculate the means of old classes with newly trained network | |
for class_idx in range(self._known_classes): | |
mask = np.where(self._targets_memory == class_idx)[0] | |
class_data, class_targets = ( | |
self._data_memory[mask], | |
self._targets_memory[mask], | |
) | |
class_dset = data_manager.get_dataset( | |
[], source="train", mode="test", appendent=(class_data, class_targets) | |
) | |
class_loader = DataLoader( | |
class_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(class_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
mean = np.mean(vectors, axis=0) | |
mean = mean / np.linalg.norm(mean) | |
_class_means[class_idx, :] = mean | |
# Construct exemplars for new classes and calculate the means | |
for class_idx in range(self._known_classes, self._total_classes): | |
data, targets, class_dset = data_manager.get_dataset( | |
np.arange(class_idx, class_idx + 1), | |
source="train", | |
mode="test", | |
ret_data=True, | |
) | |
class_loader = DataLoader( | |
class_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(class_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
class_mean = np.mean(vectors, axis=0) | |
# Select | |
selected_exemplars = [] | |
exemplar_vectors = [] | |
for k in range(1, m + 1): | |
S = np.sum( | |
exemplar_vectors, axis=0 | |
) # [feature_dim] sum of selected exemplars vectors | |
mu_p = (vectors + S) / k # [n, feature_dim] sum to all vectors | |
i = np.argmin(np.sqrt(np.sum((class_mean - mu_p) ** 2, axis=1))) | |
selected_exemplars.append( | |
np.array(data[i]) | |
) # New object to avoid passing by inference | |
exemplar_vectors.append( | |
np.array(vectors[i]) | |
) # New object to avoid passing by inference | |
vectors = np.delete( | |
vectors, i, axis=0 | |
) # Remove it to avoid duplicative selection | |
data = np.delete( | |
data, i, axis=0 | |
) # Remove it to avoid duplicative selection | |
selected_exemplars = np.array(selected_exemplars) | |
exemplar_targets = np.full(m, class_idx) | |
self._data_memory = ( | |
np.concatenate((self._data_memory, selected_exemplars)) | |
if len(self._data_memory) != 0 | |
else selected_exemplars | |
) | |
self._targets_memory = ( | |
np.concatenate((self._targets_memory, exemplar_targets)) | |
if len(self._targets_memory) != 0 | |
else exemplar_targets | |
) | |
# Exemplar mean | |
exemplar_dset = data_manager.get_dataset( | |
[], | |
source="train", | |
mode="test", | |
appendent=(selected_exemplars, exemplar_targets), | |
) | |
exemplar_loader = DataLoader( | |
exemplar_dset, batch_size=batch_size, shuffle=False, num_workers=4 | |
) | |
vectors, _ = self._extract_vectors(exemplar_loader) | |
vectors = (vectors.T / (np.linalg.norm(vectors.T, axis=0) + EPSILON)).T | |
mean = np.mean(vectors, axis=0) | |
mean = mean / np.linalg.norm(mean) | |
_class_means[class_idx, :] = mean | |
self._class_means = _class_means | |
def softmax(x): | |
"""Compute softmax values for each sets of scores in x.""" | |
e_x = np.exp(x - np.max(x)) | |
return e_x / (e_x.sum(axis=0) + 1e-7) # only difference | |