Spaces:
Running
Running
from argparse import Namespace | |
import os | |
import torch | |
import cv2 | |
from time import time | |
from pathlib import Path | |
import matplotlib.cm as cm | |
import numpy as np | |
from src.models.topic_fm import TopicFM | |
from src import get_model_cfg | |
from .base import Viz | |
from src.utils.metrics import compute_symmetrical_epipolar_errors, compute_pose_errors | |
from src.utils.plotting import draw_topics, draw_topicfm_demo, error_colormap | |
class VizTopicFM(Viz): | |
def __init__(self, args): | |
super().__init__() | |
if type(args) == dict: | |
args = Namespace(**args) | |
self.match_threshold = args.match_threshold | |
self.n_sampling_topics = args.n_sampling_topics | |
self.show_n_topics = args.show_n_topics | |
# Load model | |
conf = dict(get_model_cfg()) | |
conf['match_coarse']['thr'] = self.match_threshold | |
conf['coarse']['n_samples'] = self.n_sampling_topics | |
print("model config: ", conf) | |
self.model = TopicFM(config=conf) | |
ckpt_dict = torch.load(args.ckpt) | |
self.model.load_state_dict(ckpt_dict['state_dict']) | |
self.model = self.model.eval().to(self.device) | |
# Name the method | |
# self.ckpt_name = args.ckpt.split('/')[-1].split('.')[0] | |
self.name = 'TopicFM' | |
print(f'Initialize {self.name}') | |
def match_and_draw(self, data_dict, root_dir=None, ground_truth=False, measure_time=False, viz_matches=True): | |
if measure_time: | |
torch.cuda.synchronize() | |
start = torch.cuda.Event(enable_timing=True) | |
end = torch.cuda.Event(enable_timing=True) | |
start.record() | |
self.model(data_dict) | |
if measure_time: | |
torch.cuda.synchronize() | |
end.record() | |
torch.cuda.synchronize() | |
self.time_stats.append(start.elapsed_time(end)) | |
kpts0 = data_dict['mkpts0_f'].cpu().numpy() | |
kpts1 = data_dict['mkpts1_f'].cpu().numpy() | |
img_name0, img_name1 = list(zip(*data_dict['pair_names']))[0] | |
img0 = cv2.imread(os.path.join(root_dir, img_name0)) | |
img1 = cv2.imread(os.path.join(root_dir, img_name1)) | |
if str(data_dict["dataset_name"][0]).lower() == 'scannet': | |
img0 = cv2.resize(img0, (640, 480)) | |
img1 = cv2.resize(img1, (640, 480)) | |
if viz_matches: | |
saved_name = "_".join([img_name0.split('/')[-1].split('.')[0], img_name1.split('/')[-1].split('.')[0]]) | |
folder_matches = os.path.join(root_dir, "{}_viz_matches".format(self.name)) | |
if not os.path.exists(folder_matches): | |
os.makedirs(folder_matches) | |
path_to_save_matches = os.path.join(folder_matches, "{}.png".format(saved_name)) | |
if ground_truth: | |
compute_symmetrical_epipolar_errors(data_dict) # compute epi_errs for each match | |
compute_pose_errors(data_dict) # compute R_errs, t_errs, pose_errs for each pair | |
epi_errors = data_dict['epi_errs'].cpu().numpy() | |
R_errors, t_errors = data_dict['R_errs'][0], data_dict['t_errs'][0] | |
self.draw_matches(kpts0, kpts1, img0, img1, epi_errors, path=path_to_save_matches, | |
R_errs=R_errors, t_errs=t_errors) | |
# compute evaluation metrics | |
rel_pair_names = list(zip(*data_dict['pair_names'])) | |
bs = data_dict['image0'].size(0) | |
metrics = { | |
# to filter duplicate pairs caused by DistributedSampler | |
'identifiers': ['#'.join(rel_pair_names[b]) for b in range(bs)], | |
'epi_errs': [data_dict['epi_errs'][data_dict['m_bids'] == b].cpu().numpy() for b in range(bs)], | |
'R_errs': data_dict['R_errs'], | |
't_errs': data_dict['t_errs'], | |
'inliers': data_dict['inliers']} | |
self.eval_stats.append({'metrics': metrics}) | |
else: | |
m_conf = 1 - data_dict["mconf"].cpu().numpy() | |
self.draw_matches(kpts0, kpts1, img0, img1, m_conf, path=path_to_save_matches, conf_thr=0.4) | |
if self.show_n_topics > 0: | |
folder_topics = os.path.join(root_dir, "{}_viz_topics".format(self.name)) | |
if not os.path.exists(folder_topics): | |
os.makedirs(folder_topics) | |
draw_topics(data_dict, img0, img1, saved_folder=folder_topics, show_n_topics=self.show_n_topics, | |
saved_name=saved_name) | |
def run_demo(self, dataloader, writer=None, output_dir=None, no_display=False, skip_frames=1): | |
data_dict = next(dataloader) | |
frame_id = 0 | |
last_image_id = 0 | |
img0 = np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) / 255 | |
frame_tensor = data_dict["img"].to(self.device) | |
pair_data = {'image0': frame_tensor} | |
last_frame = cv2.resize(img0, (frame_tensor.shape[-1], frame_tensor.shape[-2]), cv2.INTER_LINEAR) | |
if output_dir is not None: | |
print('==> Will write outputs to {}'.format(output_dir)) | |
Path(output_dir).mkdir(exist_ok=True) | |
# Create a window to display the demo. | |
if not no_display: | |
window_name = 'Topic-assisted Feature Matching' | |
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) | |
cv2.resizeWindow(window_name, (640 * 2, 480 * 2)) | |
else: | |
print('Skipping visualization, will not show a GUI.') | |
# Print the keyboard help menu. | |
print('==> Keyboard control:\n' | |
'\tn: select the current frame as the reference image (left)\n' | |
'\tq: quit') | |
# vis_range = [kwargs["bottom_k"], kwargs["top_k"]] | |
while True: | |
frame_id += 1 | |
if frame_id == len(dataloader): | |
print('Finished demo_loftr.py') | |
break | |
data_dict = next(dataloader) | |
if frame_id % skip_frames != 0: | |
# print("Skipping frame.") | |
continue | |
stem0, stem1 = last_image_id, data_dict["id"][0].item() - 1 | |
frame = np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) / 255 | |
frame_tensor = data_dict["img"].to(self.device) | |
frame = cv2.resize(frame, (frame_tensor.shape[-1], frame_tensor.shape[-2]), interpolation=cv2.INTER_LINEAR) | |
pair_data = {**pair_data, 'image1': frame_tensor} | |
self.model(pair_data) | |
total_n_matches = len(pair_data['mkpts0_f']) | |
mkpts0 = pair_data['mkpts0_f'].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
mkpts1 = pair_data['mkpts1_f'].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
mconf = pair_data['mconf'].cpu().numpy() # [vis_range[0]:vis_range[1]] | |
# Normalize confidence. | |
if len(mconf) > 0: | |
mconf = 1 - mconf | |
# alpha = 0 | |
# color = cm.jet(mconf, alpha=alpha) | |
color = error_colormap(mconf, thr=0.4, alpha=0.1) | |
text = [ | |
f'Topics', | |
'#Matches: {}'.format(total_n_matches), | |
] | |
out = draw_topicfm_demo(pair_data, last_frame, frame, mkpts0, mkpts1, color, text, | |
show_n_topics=4, path=None) | |
if not no_display: | |
if writer is not None: | |
writer.write(out) | |
cv2.imshow('TopicFM Matches', out) | |
key = chr(cv2.waitKey(10) & 0xFF) | |
if key == 'q': | |
if writer is not None: | |
writer.release() | |
print('Exiting...') | |
break | |
elif key == 'n': | |
pair_data['image0'] = frame_tensor | |
last_frame = frame | |
last_image_id = (data_dict["id"][0].item() - 1) | |
frame_id_left = frame_id | |
elif output_dir is not None: | |
stem = 'matches_{:06}_{:06}'.format(stem0, stem1) | |
out_file = str(Path(output_dir, stem + '.png')) | |
print('\nWriting image to {}'.format(out_file)) | |
cv2.imwrite(out_file, out) | |
else: | |
raise ValueError("output_dir is required when no display is given.") | |
cv2.destroyAllWindows() | |
if writer is not None: | |
writer.release() | |