Vincentqyw
fix: roma
8b973ee
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from .geom import rnd_sample, interpolate
class MultiSampler(nn.Module):
"""Similar to NghSampler, but doesnt warp the 2nd image.
Distance to GT => 0 ... pos_d ... neg_d ... ngh
Pixel label => + + + + + + 0 0 - - - - - - -
Subsample on query side: if > 0, regular grid
< 0, random points
In both cases, the number of query points is = W*H/subq**2
"""
def __init__(
self,
ngh,
subq=1,
subd=1,
pos_d=0,
neg_d=2,
border=None,
maxpool_pos=True,
subd_neg=0,
):
nn.Module.__init__(self)
assert 0 <= pos_d < neg_d <= (ngh if ngh else 99)
self.ngh = ngh
self.pos_d = pos_d
self.neg_d = neg_d
assert subd <= ngh or ngh == 0
assert subq != 0
self.sub_q = subq
self.sub_d = subd
self.sub_d_neg = subd_neg
if border is None:
border = ngh
assert border >= ngh, "border has to be larger than ngh"
self.border = border
self.maxpool_pos = maxpool_pos
self.precompute_offsets()
def precompute_offsets(self):
pos_d2 = self.pos_d**2
neg_d2 = self.neg_d**2
rad2 = self.ngh**2
rad = (self.ngh // self.sub_d) * self.ngh # make an integer multiple
pos = []
neg = []
for j in range(-rad, rad + 1, self.sub_d):
for i in range(-rad, rad + 1, self.sub_d):
d2 = i * i + j * j
if d2 <= pos_d2:
pos.append((i, j))
elif neg_d2 <= d2 <= rad2:
neg.append((i, j))
self.register_buffer("pos_offsets", torch.LongTensor(pos).view(-1, 2).t())
self.register_buffer("neg_offsets", torch.LongTensor(neg).view(-1, 2).t())
def forward(
self,
feat0,
feat1,
noise_feat0,
noise_feat1,
conf0,
conf1,
noise_conf0,
noise_conf1,
pos0,
pos1,
B,
H,
W,
N=2500,
):
pscores_ls, nscores_ls, distractors_ls = [], [], []
valid_feat0_ls = []
noise_pscores_ls, noise_nscores_ls, noise_distractors_ls = [], [], []
valid_noise_feat0_ls = []
valid_pos1_ls, valid_pos2_ls = [], []
qconf_ls = []
noise_qconf_ls = []
mask_ls = []
for i in range(B):
tmp_mask = (
(pos0[i][:, 1] >= self.border)
* (pos0[i][:, 1] < W - self.border)
* (pos0[i][:, 0] >= self.border)
* (pos0[i][:, 0] < H - self.border)
)
selected_pos0 = pos0[i][tmp_mask]
selected_pos1 = pos1[i][tmp_mask]
valid_pos0, valid_pos1 = rnd_sample([selected_pos0, selected_pos1], N)
# sample features from first image
valid_feat0 = interpolate(valid_pos0 / 4, feat0[i]) # [N, 128]
valid_feat0 = F.normalize(valid_feat0, p=2, dim=-1) # [N, 128]
qconf = interpolate(valid_pos0 / 4, conf0[i])
valid_noise_feat0 = interpolate(valid_pos0 / 4, noise_feat0[i]) # [N, 128]
valid_noise_feat0 = F.normalize(valid_noise_feat0, p=2, dim=-1) # [N, 128]
noise_qconf = interpolate(valid_pos0 / 4, noise_conf0[i])
# sample GT from second image
mask = (
(valid_pos1[:, 1] >= 0)
* (valid_pos1[:, 1] < W)
* (valid_pos1[:, 0] >= 0)
* (valid_pos1[:, 0] < H)
)
def clamp(xy):
xy = xy
torch.clamp(xy[0], 0, H - 1, out=xy[0])
torch.clamp(xy[1], 0, W - 1, out=xy[1])
return xy
# compute positive scores
valid_pos1p = clamp(
valid_pos1.t()[:, None, :]
+ self.pos_offsets[:, :, None].to(valid_pos1.device)
) # [2, 29, N]
valid_pos1p = valid_pos1p.permute(1, 2, 0).reshape(
-1, 2
) # [29, N, 2] -> [29*N, 2]
valid_feat1p = interpolate(valid_pos1p / 4, feat1[i]).reshape(
self.pos_offsets.shape[-1], -1, 128
) # [29, N, 128]
valid_feat1p = F.normalize(valid_feat1p, p=2, dim=-1) # [29, N, 128]
valid_noise_feat1p = interpolate(valid_pos1p / 4, feat1[i]).reshape(
self.pos_offsets.shape[-1], -1, 128
) # [29, N, 128]
valid_noise_feat1p = F.normalize(
valid_noise_feat1p, p=2, dim=-1
) # [29, N, 128]
pscores = (
(valid_feat0[None, :, :] * valid_feat1p).sum(dim=-1).t()
) # [N, 29]
pscores, pos = pscores.max(dim=1, keepdim=True)
sel = clamp(
valid_pos1.t() + self.pos_offsets[:, pos.view(-1)].to(valid_pos1.device)
)
qconf = (qconf + interpolate(sel.t() / 4, conf1[i])) / 2
noise_pscores = (
(valid_noise_feat0[None, :, :] * valid_noise_feat1p).sum(dim=-1).t()
) # [N, 29]
noise_pscores, noise_pos = noise_pscores.max(dim=1, keepdim=True)
noise_sel = clamp(
valid_pos1.t()
+ self.pos_offsets[:, noise_pos.view(-1)].to(valid_pos1.device)
)
noise_qconf = (
noise_qconf + interpolate(noise_sel.t() / 4, noise_conf1[i])
) / 2
# compute negative scores
valid_pos1n = clamp(
valid_pos1.t()[:, None, :]
+ self.neg_offsets[:, :, None].to(valid_pos1.device)
) # [2, 29, N]
valid_pos1n = valid_pos1n.permute(1, 2, 0).reshape(
-1, 2
) # [29, N, 2] -> [29*N, 2]
valid_feat1n = interpolate(valid_pos1n / 4, feat1[i]).reshape(
self.neg_offsets.shape[-1], -1, 128
) # [29, N, 128]
valid_feat1n = F.normalize(valid_feat1n, p=2, dim=-1) # [29, N, 128]
nscores = (
(valid_feat0[None, :, :] * valid_feat1n).sum(dim=-1).t()
) # [N, 29]
valid_noise_feat1n = interpolate(valid_pos1n / 4, noise_feat1[i]).reshape(
self.neg_offsets.shape[-1], -1, 128
) # [29, N, 128]
valid_noise_feat1n = F.normalize(
valid_noise_feat1n, p=2, dim=-1
) # [29, N, 128]
noise_nscores = (
(valid_noise_feat0[None, :, :] * valid_noise_feat1n).sum(dim=-1).t()
) # [N, 29]
if self.sub_d_neg:
valid_pos2 = rnd_sample([selected_pos1], N)[0]
distractors = interpolate(valid_pos2 / 4, feat1[i])
distractors = F.normalize(distractors, p=2, dim=-1)
noise_distractors = interpolate(valid_pos2 / 4, noise_feat1[i])
noise_distractors = F.normalize(noise_distractors, p=2, dim=-1)
pscores_ls.append(pscores)
nscores_ls.append(nscores)
distractors_ls.append(distractors)
valid_feat0_ls.append(valid_feat0)
noise_pscores_ls.append(noise_pscores)
noise_nscores_ls.append(noise_nscores)
noise_distractors_ls.append(noise_distractors)
valid_noise_feat0_ls.append(valid_noise_feat0)
valid_pos1_ls.append(valid_pos1)
valid_pos2_ls.append(valid_pos2)
qconf_ls.append(qconf)
noise_qconf_ls.append(noise_qconf)
mask_ls.append(mask)
N = np.min([len(i) for i in qconf_ls])
# merge batches
qconf = torch.stack([i[:N] for i in qconf_ls], dim=0).squeeze(-1)
mask = torch.stack([i[:N] for i in mask_ls], dim=0)
pscores = torch.cat([i[:N] for i in pscores_ls], dim=0)
nscores = torch.cat([i[:N] for i in nscores_ls], dim=0)
distractors = torch.cat([i[:N] for i in distractors_ls], dim=0)
valid_feat0 = torch.cat([i[:N] for i in valid_feat0_ls], dim=0)
valid_pos1 = torch.cat([i[:N] for i in valid_pos1_ls], dim=0)
valid_pos2 = torch.cat([i[:N] for i in valid_pos2_ls], dim=0)
noise_qconf = torch.stack([i[:N] for i in noise_qconf_ls], dim=0).squeeze(-1)
noise_pscores = torch.cat([i[:N] for i in noise_pscores_ls], dim=0)
noise_nscores = torch.cat([i[:N] for i in noise_nscores_ls], dim=0)
noise_distractors = torch.cat([i[:N] for i in noise_distractors_ls], dim=0)
valid_noise_feat0 = torch.cat([i[:N] for i in valid_noise_feat0_ls], dim=0)
# remove scores that corresponds to positives or nulls
dscores = torch.matmul(valid_feat0, distractors.t())
noise_dscores = torch.matmul(valid_noise_feat0, noise_distractors.t())
dis2 = (valid_pos2[:, 1] - valid_pos1[:, 1][:, None]) ** 2 + (
valid_pos2[:, 0] - valid_pos1[:, 0][:, None]
) ** 2
b = torch.arange(B, device=dscores.device)[:, None].expand(B, N).reshape(-1)
dis2 += (b != b[:, None]).long() * self.neg_d**2
dscores[dis2 < self.neg_d**2] = 0
noise_dscores[dis2 < self.neg_d**2] = 0
scores = torch.cat((pscores, nscores, dscores), dim=1)
noise_scores = torch.cat((noise_pscores, noise_nscores, noise_dscores), dim=1)
gt = scores.new_zeros(scores.shape, dtype=torch.uint8)
gt[:, : pscores.shape[1]] = 1
return scores, noise_scores, gt, mask, qconf, noise_qconf