|
|
|
import argparse |
|
import json |
|
import pickle |
|
import random |
|
from itertools import product |
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torchvision.transforms as transforms |
|
from torch.utils.data import DataLoader |
|
from torchvision.datasets import ImageFolder |
|
from tqdm import tqdm |
|
from common.evaluation import Evaluator |
|
from model import chmnet |
|
from model.base.geometry import Geometry |
|
|
|
from Utils import ( |
|
CosineCustomDataset, |
|
PairedLayer4Extractor, |
|
compute_spatial_similarity, |
|
generate_mask, |
|
normalize_array, |
|
get_transforms, |
|
arg_topK, |
|
) |
|
|
|
|
|
random.seed(42) |
|
|
|
|
|
to_np = lambda x: x.data.to("cpu").numpy() |
|
|
|
|
|
chm_args = dict( |
|
{ |
|
"alpha": [0.05, 0.1], |
|
"img_size": 240, |
|
"ktype": "psi", |
|
"load": "pas_psi.pt", |
|
} |
|
) |
|
|
|
|
|
class CHMGridTransfer: |
|
def __init__( |
|
self, |
|
query_image, |
|
support_set, |
|
support_set_labels, |
|
train_folder, |
|
top_N, |
|
top_K, |
|
binarization_threshold, |
|
chm_source_transform, |
|
chm_target_transform, |
|
cosine_source_transform, |
|
cosine_target_transform, |
|
batch_size=64, |
|
): |
|
self.N = top_N |
|
self.K = top_K |
|
self.BS = batch_size |
|
|
|
self.chm_source_transform = chm_source_transform |
|
self.chm_target_transform = chm_target_transform |
|
self.cosine_source_transform = cosine_source_transform |
|
self.cosine_target_transform = cosine_target_transform |
|
|
|
self.source_embeddings = None |
|
self.target_embeddings = None |
|
self.correspondence_map = None |
|
self.similarity_maps = None |
|
self.reverse_similarity_maps = None |
|
self.transferred_points = None |
|
|
|
self.binarization_threshold = binarization_threshold |
|
|
|
|
|
self.q = query_image |
|
self.support_set = support_set |
|
self.labels_ss = support_set_labels |
|
|
|
def build(self): |
|
|
|
test_ds = CosineCustomDataset( |
|
query_image=self.q, |
|
supporting_set=self.support_set, |
|
source_transform=self.chm_source_transform, |
|
target_transform=self.chm_target_transform, |
|
) |
|
test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
|
self.find_correspondences(test_dl) |
|
|
|
|
|
test_ds = CosineCustomDataset( |
|
query_image=self.q, |
|
supporting_set=self.support_set, |
|
source_transform=self.cosine_source_transform, |
|
target_transform=self.cosine_target_transform, |
|
) |
|
test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
|
self.compute_embeddings(test_dl) |
|
self.compute_similarity_map() |
|
|
|
def find_correspondences(self, test_dl): |
|
model = chmnet.CHMNet(chm_args["ktype"]) |
|
model.load_state_dict( |
|
torch.load(chm_args["load"], map_location=torch.device("cpu")) |
|
) |
|
Evaluator.initialize(chm_args["alpha"]) |
|
Geometry.initialize(img_size=chm_args["img_size"]) |
|
|
|
grid_results = [] |
|
transferred_points = [] |
|
|
|
|
|
fixed_src_grid_points = list( |
|
product( |
|
np.linspace(1 + 17, 240 - 17 - 1, 7), |
|
np.linspace(1 + 17, 240 - 17 - 1, 7), |
|
) |
|
) |
|
fixed_src_grid_points = np.asarray(fixed_src_grid_points, dtype=np.float64).T |
|
|
|
with torch.no_grad(): |
|
model.eval() |
|
for idx, batch in enumerate(tqdm(test_dl)): |
|
|
|
keypoints = ( |
|
torch.tensor(fixed_src_grid_points) |
|
.unsqueeze(0) |
|
.repeat(batch["src_img"].shape[0], 1, 1) |
|
) |
|
n_pts = torch.tensor( |
|
np.asarray(batch["src_img"].shape[0] * [49]), dtype=torch.long |
|
) |
|
|
|
corr_matrix = model(batch["src_img"], batch["trg_img"]) |
|
prd_kps = Geometry.transfer_kps( |
|
corr_matrix, keypoints, n_pts, normalized=False |
|
) |
|
transferred_points.append(prd_kps.cpu().numpy()) |
|
for tgt_points in prd_kps: |
|
tgt_grid = [] |
|
for x, y in zip(tgt_points[0], tgt_points[1]): |
|
tgt_grid.append( |
|
[int(((x + 1) / 2.0) * 7), int(((y + 1) / 2.0) * 7)] |
|
) |
|
grid_results.append(tgt_grid) |
|
|
|
self.correspondence_map = grid_results |
|
self.transferred_points = np.vstack(transferred_points) |
|
|
|
def compute_embeddings(self, test_dl): |
|
paired_extractor = PairedLayer4Extractor() |
|
|
|
source_embeddings = [] |
|
target_embeddings = [] |
|
|
|
with torch.no_grad(): |
|
for idx, batch in enumerate(test_dl): |
|
s_e, t_e = paired_extractor((batch["src_img"], batch["trg_img"])) |
|
|
|
source_embeddings.append(s_e) |
|
target_embeddings.append(t_e) |
|
|
|
|
|
self.source_embeddings = torch.cat(source_embeddings, axis=0) |
|
self.target_embeddings = torch.cat(target_embeddings, axis=0) |
|
|
|
def compute_similarity_map(self): |
|
CosSim = nn.CosineSimilarity(dim=0, eps=1e-6) |
|
|
|
similarity_maps = [] |
|
rsimilarity_maps = [] |
|
|
|
grid = [] |
|
for i in range(7): |
|
for j in range(7): |
|
grid.append([i, j]) |
|
|
|
|
|
for i in range(len(self.correspondence_map)): |
|
cosine_map = np.zeros((7, 7)) |
|
reverse_cosine_map = np.zeros((7, 7)) |
|
|
|
|
|
for S, T in zip(grid, self.correspondence_map[i]): |
|
v1 = self.source_embeddings[i][:, S[0], S[1]] |
|
v2 = self.target_embeddings[i][:, T[0], T[1]] |
|
covalue = CosSim(v1, v2) |
|
cosine_map[S[0], S[1]] = covalue |
|
reverse_cosine_map[T[0], T[1]] = covalue |
|
|
|
similarity_maps.append(cosine_map) |
|
rsimilarity_maps.append(reverse_cosine_map) |
|
|
|
self.similarity_maps = similarity_maps |
|
self.reverse_similarity_maps = rsimilarity_maps |
|
|
|
def compute_score_using_cc(self): |
|
|
|
SIMS_source, SIMS_target = [], [] |
|
for i in range(len(self.source_embeddings)): |
|
simA, simB = compute_spatial_similarity( |
|
to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
|
) |
|
|
|
SIMS_source.append(simA) |
|
SIMS_target.append(simB) |
|
|
|
SIMS_source = np.stack(SIMS_source, axis=0) |
|
|
|
|
|
top_cos_values = [] |
|
|
|
for i in range(len(self.similarity_maps)): |
|
cosine_value = np.multiply( |
|
self.similarity_maps[i], |
|
generate_mask( |
|
normalize_array(SIMS_source[i]), t=self.binarization_threshold |
|
), |
|
) |
|
top_5_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1][:5] |
|
mean_of_top_5 = np.mean( |
|
[cosine_value.T.reshape(-1)[x] for x in top_5_indicies] |
|
) |
|
top_cos_values.append(np.mean(mean_of_top_5)) |
|
|
|
return top_cos_values |
|
|
|
def compute_score_using_custom_points(self, selected_keypoint_masks): |
|
top_cos_values = [] |
|
|
|
for i in range(len(self.similarity_maps)): |
|
cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
|
top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
|
mean_of_tops = np.mean( |
|
[cosine_value.T.reshape(-1)[x] for x in top_indicies] |
|
) |
|
top_cos_values.append(np.mean(mean_of_tops)) |
|
|
|
return top_cos_values |
|
|
|
def export(self): |
|
storage = { |
|
"N": self.N, |
|
"K": self.K, |
|
"source_embeddings": self.source_embeddings, |
|
"target_embeddings": self.target_embeddings, |
|
"correspondence_map": self.correspondence_map, |
|
"similarity_maps": self.similarity_maps, |
|
"T": self.binarization_threshold, |
|
"query": self.q, |
|
"support_set": self.support_set, |
|
"labels_for_support_set": self.labels_ss, |
|
"rsimilarity_maps": self.reverse_similarity_maps, |
|
"transferred_points": self.transferred_points, |
|
} |
|
|
|
return ModifiableCHMResults(storage) |
|
|
|
|
|
class ModifiableCHMResults: |
|
def __init__(self, storage): |
|
self.N = storage["N"] |
|
self.K = storage["K"] |
|
self.source_embeddings = storage["source_embeddings"] |
|
self.target_embeddings = storage["target_embeddings"] |
|
self.correspondence_map = storage["correspondence_map"] |
|
self.similarity_maps = storage["similarity_maps"] |
|
self.T = storage["T"] |
|
self.q = storage["query"] |
|
self.support_set = storage["support_set"] |
|
self.labels_ss = storage["labels_for_support_set"] |
|
self.rsimilarity_maps = storage["rsimilarity_maps"] |
|
self.transferred_points = storage["transferred_points"] |
|
self.similarity_maps_masked = None |
|
self.SIMS_source = None |
|
self.SIMS_target = None |
|
self.masked_sim_values = [] |
|
self.top_cos_values = [] |
|
|
|
def compute_score_using_cc(self): |
|
|
|
SIMS_source, SIMS_target = [], [] |
|
for i in range(len(self.source_embeddings)): |
|
simA, simB = compute_spatial_similarity( |
|
to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
|
) |
|
|
|
SIMS_source.append(simA) |
|
SIMS_target.append(simB) |
|
|
|
SIMS_source = np.stack(SIMS_source, axis=0) |
|
SIMS_target = np.stack(SIMS_target, axis=0) |
|
|
|
self.SIMS_source = SIMS_source |
|
self.SIMS_target = SIMS_target |
|
|
|
top_cos_values = [] |
|
|
|
for i in range(len(self.similarity_maps)): |
|
masked_sim_values = np.multiply( |
|
self.similarity_maps[i], |
|
generate_mask(normalize_array(SIMS_source[i]), t=self.T), |
|
) |
|
self.masked_sim_values.append(masked_sim_values) |
|
top_5_indicies = np.argsort(masked_sim_values.T.reshape(-1))[::-1][:5] |
|
mean_of_top_5 = np.mean( |
|
[masked_sim_values.T.reshape(-1)[x] for x in top_5_indicies] |
|
) |
|
top_cos_values.append(np.mean(mean_of_top_5)) |
|
|
|
self.top_cos_values = top_cos_values |
|
|
|
return top_cos_values |
|
|
|
def compute_score_using_custom_points(self, selected_keypoint_masks): |
|
top_cos_values = [] |
|
similarity_maps_masked = [] |
|
|
|
for i in range(len(self.similarity_maps)): |
|
cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
|
similarity_maps_masked.append(cosine_value) |
|
top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
|
mean_of_tops = np.mean( |
|
[cosine_value.T.reshape(-1)[x] for x in top_indicies] |
|
) |
|
top_cos_values.append(np.mean(mean_of_tops)) |
|
|
|
self.similarity_maps_masked = similarity_maps_masked |
|
return top_cos_values |
|
|
|
def predict_using_cc(self): |
|
top_cos_values = self.compute_score_using_cc() |
|
|
|
prediction = np.argmax( |
|
np.bincount( |
|
[self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
|
) |
|
) |
|
prediction_weight = np.max( |
|
np.bincount( |
|
[self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
|
) |
|
) |
|
|
|
reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
|
reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
|
|
|
topK_idx = [ |
|
x |
|
for x in np.argsort(top_cos_values)[::-1] |
|
if self.labels_ss[x] == prediction |
|
] |
|
topK_files = [self.support_set[x] for x in topK_idx] |
|
topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
|
topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
|
topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
|
topK_transfered_points = [self.transferred_points[x] for x in topK_idx] |
|
predicted_folder_name = topK_files[0].split("/")[-2] |
|
|
|
return ( |
|
topK_idx, |
|
prediction, |
|
predicted_folder_name, |
|
prediction_weight, |
|
topK_files[: self.K], |
|
reranked_nns_files[: self.K], |
|
topK_cmaps[: self.K], |
|
topK_similarity_maps[: self.K], |
|
topK_rsimilarity_maps[: self.K], |
|
topK_transfered_points[: self.K], |
|
) |
|
|
|
def predict_custom_pairs(self, selected_keypoint_masks): |
|
top_cos_values = self.compute_score_using_custom_points(selected_keypoint_masks) |
|
|
|
|
|
prediction = np.argmax( |
|
np.bincount( |
|
[self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
|
) |
|
) |
|
prediction_weight = np.max( |
|
np.bincount( |
|
[self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
|
) |
|
) |
|
|
|
reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
|
reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
|
|
|
topK_idx = [ |
|
x |
|
for x in np.argsort(top_cos_values)[::-1] |
|
if self.labels_ss[x] == prediction |
|
] |
|
topK_files = [self.support_set[x] for x in topK_idx] |
|
topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
|
topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
|
topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
|
topK_transferred_points = [self.transferred_points[x] for x in topK_idx] |
|
|
|
topK_masked_sims = [self.similarity_maps_masked[x] for x in topK_idx] |
|
predicted_folder_name = topK_files[0].split("/")[-2] |
|
|
|
non_zero_mask = np.count_nonzero(selected_keypoint_masks) |
|
|
|
return ( |
|
topK_idx, |
|
prediction, |
|
predicted_folder_name, |
|
prediction_weight, |
|
topK_files[: self.K], |
|
reranked_nns_files[: self.K], |
|
topK_cmaps[: self.K], |
|
topK_similarity_maps[: self.K], |
|
topK_rsimilarity_maps[: self.K], |
|
topK_transferred_points[: self.K], |
|
topK_masked_sims[: self.K], |
|
non_zero_mask, |
|
) |
|
|
|
|
|
def export_visualizations_results( |
|
reranker_output, |
|
knn_predicted_label, |
|
knn_confidence, |
|
topK_knns, |
|
K=20, |
|
N=50, |
|
T=0.55, |
|
): |
|
""" |
|
Export all details for visualization and analysis |
|
""" |
|
|
|
non_zero_mask = 5 |
|
( |
|
topK_idx, |
|
p, |
|
pfn, |
|
pr, |
|
rfiles, |
|
reranked_nns, |
|
cmaps, |
|
sims, |
|
rsims, |
|
trns_kpts, |
|
) = reranker_output.predict_using_cc() |
|
|
|
MASKED_COSINE_VALUES = [ |
|
np.multiply( |
|
sims[X], |
|
generate_mask( |
|
normalize_array(reranker_output.SIMS_source[topK_idx[X]]), t=T |
|
), |
|
) |
|
for X in range(len(sims)) |
|
] |
|
|
|
list_of_source_points = [] |
|
list_of_target_points = [] |
|
|
|
for CK in range(len(sims)): |
|
target_keypoints = [] |
|
topk_index = arg_topK(MASKED_COSINE_VALUES[CK], topK=non_zero_mask) |
|
|
|
for i in range(non_zero_mask): |
|
|
|
x, y = trns_kpts[CK].T[topk_index[i]] |
|
Ptarget = int(((x + 1) / 2.0) * 240), int(((y + 1) / 2.0) * 240) |
|
target_keypoints.append(Ptarget) |
|
|
|
|
|
a = np.linspace(1 + 17, 240 - 17 - 1, 7) |
|
b = np.linspace(1 + 17, 240 - 17 - 1, 7) |
|
point_list = list(product(a, b)) |
|
|
|
list_of_source_points.append(np.asarray([point_list[x] for x in topk_index])) |
|
list_of_target_points.append(np.asarray(target_keypoints)) |
|
|
|
|
|
detailed_output = { |
|
"q": reranker_output.q, |
|
"K": K, |
|
"N": N, |
|
"knn-prediction": knn_predicted_label, |
|
"knn-prediction-confidence": knn_confidence, |
|
"knn-nearest-neighbors": topK_knns, |
|
"chm-prediction": pfn, |
|
"chm-prediction-confidence": pr, |
|
"chm-nearest-neighbors": rfiles, |
|
"chm-nearest-neighbors-all": reranked_nns, |
|
"correspondance_map": cmaps, |
|
"masked_cos_values": MASKED_COSINE_VALUES, |
|
"src-keypoints": list_of_source_points, |
|
"tgt-keypoints": list_of_target_points, |
|
"non_zero_mask": non_zero_mask, |
|
"transferred_kpoints": trns_kpts, |
|
} |
|
|
|
return detailed_output |
|
|
|
|
|
def chm_classify_and_visualize( |
|
query_image, kNN_results, support, TRAIN_SET, N=50, K=20, T=0.55, BS=64 |
|
): |
|
global chm_args |
|
chm_src_t, chm_tgt_t, cos_src_t, cos_tgt_t = get_transforms("single", chm_args) |
|
knn_predicted_label, knn_confidence, topK_knns = kNN_results |
|
|
|
reranker = CHMGridTransfer( |
|
query_image=query_image, |
|
support_set=support[0], |
|
support_set_labels=support[1], |
|
train_folder=TRAIN_SET, |
|
top_N=N, |
|
top_K=K, |
|
binarization_threshold=T, |
|
chm_source_transform=chm_src_t, |
|
chm_target_transform=chm_tgt_t, |
|
cosine_source_transform=cos_src_t, |
|
cosine_target_transform=cos_tgt_t, |
|
batch_size=BS, |
|
) |
|
|
|
|
|
reranker.build() |
|
|
|
exported_reranker = reranker.export() |
|
|
|
|
|
output = export_visualizations_results( |
|
exported_reranker, |
|
knn_predicted_label, |
|
knn_confidence, |
|
topK_knns, |
|
K, |
|
N, |
|
T, |
|
) |
|
|
|
return output |
|
|