|
|
|
|
|
|
|
|
|
import pdb, sys, os |
|
import argparse |
|
import numpy as np |
|
from scipy.sparse import coo_matrix, csr_matrix, triu, csgraph |
|
|
|
import core.functional as myF |
|
from tools.common import image, image_with_trf |
|
from tools.viz import dbgfig, show_correspondences |
|
|
|
|
|
def arg_parser(): |
|
parser = argparse.ArgumentParser("Post-filtering of Deep matching correspondences") |
|
|
|
parser.add_argument("--img1", required=True, help="path to first image") |
|
parser.add_argument("--img2", required=True, help="path to second image") |
|
parser.add_argument("--resize", default=0, type=int, help="prior image downsize (0 if recursive)") |
|
parser.add_argument("--corres", required=True, help="input path") |
|
parser.add_argument("--output", default="", help="filtered corres output") |
|
|
|
parser.add_argument("--locality", type=float, default=2, help="tolerance to deformation") |
|
parser.add_argument("--min-cc-size", type=int, default=50, help="min connex-component size") |
|
parser.add_argument("--densify", default='no', choices=['no','full','cc','convex'], help="output pixel-dense corres field") |
|
parser.add_argument("--dense-side", default='left', choices=['left','right'], help="img to densify") |
|
|
|
parser.add_argument("--verbose", "-v", type=int, default=0, help="verbosity level") |
|
parser.add_argument("--dbg", type=str, nargs='+', default=(), help="debug options") |
|
return parser |
|
|
|
|
|
def main(args): |
|
import test_singlescale as pump |
|
corres = np.load(args.corres)['corres'] |
|
imgs = tuple(map(image, pump.Main.load_images(args))) |
|
|
|
if dbgfig('raw',args.dbg): |
|
show_correspondences(*imgs, corres) |
|
|
|
corres = filter_corres( *imgs, corres, |
|
locality=args.locality, min_cc_size=args.min_cc_size, |
|
densify=args.densify, dense_side=args.dense_side, |
|
verbose=args.verbose, dbg=args.dbg) |
|
|
|
if dbgfig('viz',args.dbg): |
|
show_correspondences(*imgs, corres) |
|
|
|
return pump.save_output( args, corres ) |
|
|
|
|
|
def filter_corres( img0, img1, corres, |
|
locality = None, |
|
min_cc_size = None, |
|
densify = None, |
|
dense_side = None, |
|
verbose = 0, dbg=()): |
|
|
|
if None in (locality, min_cc_size, densify, dense_side): |
|
default_params = arg_parser() |
|
locality = locality or default_params.get_default('locality') |
|
min_cc_size = min_cc_size or default_params.get_default('min_cc_size') |
|
densify = densify or default_params.get_default('densify') |
|
dense_side = dense_side or default_params.get_default('dense_side') |
|
|
|
img0, trf0 = img0 if isinstance(img0,tuple) else (img0, np.eye(3)) |
|
img1, trf1 = img1 if isinstance(img1,tuple) else (img1, np.eye(3)) |
|
assert isinstance(img0, np.ndarray) and isinstance(img1, np.ndarray) |
|
|
|
corres = myF.affmul((np.linalg.inv(trf0),np.linalg.inv(trf1)), corres) |
|
n_corres = len(corres) |
|
if verbose: print(f'>> input: {len(corres)} correspondences') |
|
|
|
graph = compute_graph(corres, max_dis=locality*4) |
|
if verbose: print(f'>> {locality=}: {graph.nnz} nodes in graph') |
|
|
|
cc_sizes = measure_connected_components(graph) |
|
corres[:,4] += np.log2(cc_sizes) |
|
corres = corres[cc_sizes > min_cc_size] |
|
if verbose: print(f'>> {min_cc_size=}: remaining {len(corres)} correspondences') |
|
|
|
final = myF.affmul((trf0,trf1), corres) |
|
|
|
if densify != 'no': |
|
|
|
if dense_side == 'right': |
|
final = final[:,[2,3,0,1]] |
|
H = round(img1.shape[0] / trf1[1,1]) |
|
W = round(img1.shape[1] / trf1[0,0]) |
|
else: |
|
H = round(img0.shape[0] / trf0[1,1]) |
|
W = round(img0.shape[1] / trf0[0,0]) |
|
|
|
if densify == 'cc': |
|
assert False, 'todo' |
|
elif densify in (True, 'full', 'convex'): |
|
|
|
final = densify_corres( final, (H, W), full=(densify!='convex') ) |
|
else: |
|
raise ValueError(f'Bad mode for {densify=}') |
|
|
|
if dense_side == 'right': |
|
final = final[:,[2,3,0,1]] |
|
|
|
return final |
|
|
|
|
|
def compute_graph(corres, max_dis=10, min_ang=90): |
|
""" 4D distances (corres can only be connected to same scale) |
|
using sparse matrices for efficiency |
|
|
|
step1: build horizontal and vertical binning, binsize = max_dis |
|
add in each bin all neighbor bins |
|
step2: for each corres, we can intersect 2 bins to get a short list of candidates |
|
step3: verify euclidean distance < maxdis (optional?) |
|
""" |
|
def bin_positions(pos): |
|
|
|
bin_indices = np.int32(pos.clip(min=0) // max_dis) + 1 |
|
cols = np.arange(len(pos)) |
|
|
|
|
|
res = csr_matrix((np.ones(len(bin_indices)*3,dtype=np.float32), |
|
(np.r_[bin_indices-1, bin_indices, bin_indices+1], np.r_[cols,cols,cols])), |
|
shape=(bin_indices.max()+2 if bin_indices.size else 1, len(pos))) |
|
|
|
return res, bin_indices |
|
|
|
|
|
x1_bins = bin_positions(corres[:,0]) |
|
y1_bins = bin_positions(corres[:,1]) |
|
x2_bins = bin_positions(corres[:,2]) |
|
y2_bins = bin_positions(corres[:,3]) |
|
|
|
def row_indices(ngh): |
|
res = np.bincount(ngh.indptr[1:-1], minlength=ngh.indptr[-1])[:-1] |
|
return res.cumsum() |
|
|
|
def compute_dist( ngh, pts, scale=None ): |
|
|
|
x_pos = pts[ngh.indices,0] |
|
y_pos = pts[ngh.indices,1] |
|
|
|
|
|
rows = row_indices(ngh) |
|
x_pos -= pts[rows, 0] |
|
y_pos -= pts[rows, 1] |
|
dis = np.sqrt(np.square(x_pos) + np.square(y_pos)) |
|
if scale is not None: |
|
|
|
dis *= (scale[rows] + scale[ngh.indices]) / 2 |
|
|
|
return normed(np.c_[x_pos, y_pos]), dis |
|
|
|
def Rot( ngh, degrees ): |
|
rows = row_indices(ngh) |
|
rad = degrees * np.pi / 180 |
|
rad = (rad[rows] + rad[ngh.indices]) / 2 |
|
cos, sin = np.cos(rad), np.sin(rad) |
|
return np.float32(((cos, -sin), (sin,cos))).transpose(2,0,1) |
|
|
|
def match(xbins, ybins, pt1, pt2, way): |
|
xb, ixb = xbins |
|
yb, iyb = ybins |
|
|
|
|
|
ngh = xb[ixb].multiply( yb[iyb] ) |
|
ngh = triu(ngh, k=1).tocsr() |
|
|
|
|
|
|
|
vec1, d1 = compute_dist(ngh, pt1) |
|
|
|
scale, rot = myF.decode_scale_rot(corres[:,5]) |
|
vec2, d2 = compute_dist(ngh, pt2, scale=scale**(-way)) |
|
ang = np.einsum('ik,ik->i', (vec1[:,None] @ Rot(ngh,way*rot))[:,0], vec2) |
|
|
|
valid = (d1 <= max_dis) & (d2 <= max_dis) & (ang >= np.cos(min_ang*np.pi/180)) |
|
res = csr_matrix((valid, ngh.indices, ngh.indptr), shape=ngh.shape) |
|
res.eliminate_zeros() |
|
return res |
|
|
|
|
|
ngh1 = match(x1_bins, y1_bins, corres[:,0:2], corres[:,2:4], way=+1) |
|
ngh2 = match(x2_bins, y2_bins, corres[:,2:4], corres[:,0:2], way=-1).T |
|
|
|
return ngh1 + ngh2 |
|
|
|
|
|
def measure_connected_components(graph, dbg=()): |
|
|
|
nc, labels = csgraph.connected_components(graph, directed=False) |
|
|
|
|
|
count = np.bincount(labels) |
|
|
|
return count[labels] |
|
|
|
def normed( mat ): |
|
return mat / np.linalg.norm(mat, axis=-1, keepdims=True).clip(min=1e-16) |
|
|
|
|
|
def densify_corres( corres, shape, full=True ): |
|
from scipy.interpolate import LinearNDInterpolator |
|
from scipy.spatial import cKDTree as KDTree |
|
|
|
assert len(corres) > 3, 'Not enough corres for densification' |
|
H, W = shape |
|
|
|
interp = LinearNDInterpolator(corres[:,0:2], corres[:,2:4]) |
|
X, Y = np.mgrid[0:H, 0:W][::-1] |
|
p1 = np.c_[X.ravel(), Y.ravel()] |
|
p2 = interp(X, Y) |
|
|
|
p2 = p2.reshape(-1,2) |
|
invalid = np.isnan(p2).any(axis=1) |
|
|
|
if full: |
|
|
|
badp = p1[invalid] |
|
tree = KDTree(corres[:,0:2]) |
|
_, nn = tree.query(badp, 3) |
|
corflow = corres[:,2:4] - corres[:,0:2] |
|
p2.reshape(-1,2)[invalid] = corflow[nn].mean(axis=1) + p1[invalid] |
|
else: |
|
|
|
p1, p2 = p1[~invalid], p2[~invalid] |
|
|
|
|
|
return np.c_[p1, p2] |
|
|
|
|
|
if __name__ == '__main__': |
|
main(arg_parser().parse_args()) |
|
|