Spaces:
Running
Running
# -*- coding: UTF-8 -*- | |
'''================================================= | |
@Project -> File pram -> basicdataset | |
@IDE PyCharm | |
@Author fx221@cam.ac.uk | |
@Date 29/01/2024 14:27 | |
==================================================''' | |
import torchvision.transforms.functional as tvf | |
import torchvision.transforms as tvt | |
import os.path as osp | |
import numpy as np | |
import cv2 | |
from colmap_utils.read_write_model import qvec2rotmat, read_model | |
from dataset.utils import normalize_size | |
class BasicDataset: | |
def __init__(self, | |
img_list_fn, | |
feature_dir, | |
sfm_path, | |
seg_fn, | |
dataset_path, | |
n_class, | |
dataset, | |
nfeatures=1024, | |
query_p3d_fn=None, | |
train=True, | |
with_aug=False, | |
min_inliers=0, | |
max_inliers=4096, | |
random_inliers=False, | |
jitter_params=None, | |
scale_params=None, | |
image_dim=1, | |
pre_load=False, | |
query_info_path=None, | |
sc_mean_scale_fn=None, | |
): | |
self.n_class = n_class | |
self.train = train | |
self.min_inliers = min_inliers | |
self.max_inliers = max_inliers if max_inliers < nfeatures else nfeatures | |
self.random_inliers = random_inliers | |
self.dataset_path = dataset_path | |
self.with_aug = with_aug | |
self.dataset = dataset | |
self.jitter_params = jitter_params | |
self.scale_params = scale_params | |
self.image_dim = image_dim | |
self.image_prefix = '' | |
train_transforms = [] | |
if self.with_aug: | |
train_transforms.append(tvt.ColorJitter( | |
brightness=jitter_params['brightness'], | |
contrast=jitter_params['contrast'], | |
saturation=jitter_params['saturation'], | |
hue=jitter_params['hue'])) | |
if jitter_params['blur'] > 0: | |
train_transforms.append(tvt.GaussianBlur(kernel_size=int(jitter_params['blur']))) | |
self.train_transforms = tvt.Compose(train_transforms) | |
# only for testing of query images | |
if not self.train: | |
data = np.load(query_p3d_fn, allow_pickle=True)[()] | |
self.img_p3d = data | |
else: | |
self.img_p3d = {} | |
self.img_fns = [] | |
with open(img_list_fn, 'r') as f: | |
lines = f.readlines() | |
for l in lines: | |
l = l.strip() | |
self.img_fns.append(l) | |
print('Load {} images from {} for {}...'.format(len(self.img_fns), dataset, 'training' if train else 'eval')) | |
self.feats = {} | |
if train: | |
self.cameras, self.images, point3Ds = read_model(path=sfm_path, ext='.bin') | |
self.name_to_id = {image.name: i for i, image in self.images.items()} | |
data = np.load(seg_fn, allow_pickle=True)[()] | |
p3d_id = data['id'] | |
seg_id = data['label'] | |
self.p3d_seg = {p3d_id[i]: seg_id[i] for i in range(p3d_id.shape[0])} | |
self.p3d_xyzs = {} | |
for pid in self.p3d_seg.keys(): | |
p3d = point3Ds[pid] | |
self.p3d_xyzs[pid] = p3d.xyz | |
with open(sc_mean_scale_fn, 'r') as f: | |
lines = f.readlines() | |
for l in lines: | |
l = l.strip().split() | |
self.mean_xyz = np.array([float(v) for v in l[:3]]) | |
self.scale_xyz = np.array([float(v) for v in l[3:]]) | |
if not train: | |
self.query_info = self.read_query_info(path=query_info_path) | |
self.nfeatures = nfeatures | |
self.feature_dir = feature_dir | |
print('Pre loaded {} feats, mean xyz {}, scale xyz {}'.format(len(self.feats.keys()), self.mean_xyz, | |
self.scale_xyz)) | |
def normalize_p3ds(self, p3ds): | |
mean_p3ds = np.ceil(np.mean(p3ds, axis=0)) | |
p3ds_ = p3ds - mean_p3ds | |
dx = np.max(abs(p3ds_[:, 0])) | |
dy = np.max(abs(p3ds_[:, 1])) | |
dz = np.max(abs(p3ds_[:, 2])) | |
scale_p3ds = np.ceil(np.array([dx, dy, dz], dtype=float).reshape(3, )) | |
scale_p3ds[scale_p3ds < 1] = 1 | |
scale_p3ds[scale_p3ds == 0] = 1 | |
return mean_p3ds, scale_p3ds | |
def read_query_info(self, path): | |
query_info = {} | |
with open(path, 'r') as f: | |
lines = f.readlines() | |
for l in lines: | |
l = l.strip().split() | |
image_name = l[0] | |
cam_model = l[1] | |
h, w = int(l[2]), int(l[3]) | |
params = np.array([float(v) for v in l[4:]]) | |
query_info[image_name] = { | |
'width': w, | |
'height': h, | |
'model': cam_model, | |
'params': params, | |
} | |
return query_info | |
def extract_intrinsic_extrinsic_params(self, image_id): | |
cam = self.cameras[self.images[image_id].camera_id] | |
params = cam.params | |
model = cam.model | |
if model in ("SIMPLE_PINHOLE", "SIMPLE_RADIAL", "RADIAL"): | |
fx = fy = params[0] | |
cx = params[1] | |
cy = params[2] | |
elif model in ("PINHOLE", "OPENCV", "OPENCV_FISHEYE", "FULL_OPENCV"): | |
fx = params[0] | |
fy = params[1] | |
cx = params[2] | |
cy = params[3] | |
else: | |
raise Exception("Camera model not supported") | |
K = np.eye(3, dtype=float) | |
K[0, 0] = fx | |
K[1, 1] = fy | |
K[0, 2] = cx | |
K[1, 2] = cy | |
qvec = self.images[image_id].qvec | |
tvec = self.images[image_id].tvec | |
R = qvec2rotmat(qvec=qvec) | |
P = np.eye(4, dtype=float) | |
P[:3, :3] = R | |
P[:3, 3] = tvec.reshape(3, ) | |
return {'K': K, 'P': P} | |
def get_item_train(self, idx): | |
img_name = self.img_fns[idx] | |
if img_name in self.feats.keys(): | |
feat_data = self.feats[img_name] | |
else: | |
feat_data = np.load(osp.join(self.feature_dir, img_name.replace('/', '+') + '.npy'), allow_pickle=True)[()] | |
# descs = feat_data['descriptors'] # [N, D] | |
scores = feat_data['scores'] # [N, 1] | |
kpts = feat_data['keypoints'] # [N, 2] | |
image_size = feat_data['image_size'] | |
nfeat = kpts.shape[0] | |
# print(img_name, self.name_to_id[img_name]) | |
p3d_ids = self.images[self.name_to_id[img_name]].point3D_ids | |
p3d_xyzs = np.zeros(shape=(nfeat, 3), dtype=float) | |
seg_ids = np.zeros(shape=(nfeat,), dtype=int) # + self.n_class - 1 | |
for i in range(nfeat): | |
p3d = p3d_ids[i] | |
if p3d in self.p3d_seg.keys(): | |
seg_ids[i] = self.p3d_seg[p3d] + 1 # 0 for invalid | |
if seg_ids[i] == -1: | |
seg_ids[i] = 0 | |
if p3d in self.p3d_xyzs.keys(): | |
p3d_xyzs[i] = self.p3d_xyzs[p3d] | |
seg_ids = np.array(seg_ids).reshape(-1, ) | |
n_inliers = np.sum(seg_ids > 0) | |
n_outliers = np.sum(seg_ids == 0) | |
inlier_ids = np.where(seg_ids > 0)[0] | |
outlier_ids = np.where(seg_ids == 0)[0] | |
if n_inliers <= self.min_inliers: | |
sel_inliers = n_inliers | |
sel_outliers = self.nfeatures - sel_inliers | |
out_ids = np.arange(n_outliers) | |
np.random.shuffle(out_ids) | |
sel_ids = np.hstack([inlier_ids, outlier_ids[out_ids[:self.nfeatures - n_inliers]]]) | |
else: | |
sel_inliers = np.random.randint(self.min_inliers, self.max_inliers) | |
if sel_inliers > n_inliers: | |
sel_inliers = n_inliers | |
if sel_inliers + n_outliers < self.nfeatures: | |
sel_inliers = self.nfeatures - n_outliers | |
sel_outliers = self.nfeatures - sel_inliers | |
in_ids = np.arange(n_inliers) | |
np.random.shuffle(in_ids) | |
sel_inlier_ids = inlier_ids[in_ids[:sel_inliers]] | |
out_ids = np.arange(n_outliers) | |
np.random.shuffle(out_ids) | |
sel_outlier_ids = outlier_ids[out_ids[:sel_outliers]] | |
sel_ids = np.hstack([sel_inlier_ids, sel_outlier_ids]) | |
# sel_descs = descs[sel_ids] | |
sel_scores = scores[sel_ids] | |
sel_kpts = kpts[sel_ids] | |
sel_seg_ids = seg_ids[sel_ids] | |
sel_xyzs = p3d_xyzs[sel_ids] | |
shuffle_ids = np.arange(sel_ids.shape[0]) | |
np.random.shuffle(shuffle_ids) | |
# sel_descs = sel_descs[shuffle_ids] | |
sel_scores = sel_scores[shuffle_ids] | |
sel_kpts = sel_kpts[shuffle_ids] | |
sel_seg_ids = sel_seg_ids[shuffle_ids] | |
sel_xyzs = sel_xyzs[shuffle_ids] | |
if sel_kpts.shape[0] < self.nfeatures: | |
# print(sel_descs.shape, sel_kpts.shape, sel_scores.shape, sel_seg_ids.shape, sel_xyzs.shape) | |
valid_sel_ids = np.array([v for v in range(sel_kpts.shape[0]) if sel_seg_ids[v] > 0], dtype=int) | |
# ref_sel_id = np.random.choice(valid_sel_ids, size=1)[0] | |
if valid_sel_ids.shape[0] == 0: | |
valid_sel_ids = np.array([v for v in range(sel_kpts.shape[0])], dtype=int) | |
random_n = self.nfeatures - sel_kpts.shape[0] | |
random_scores = np.random.random((random_n,)) | |
random_kpts, random_seg_ids, random_xyzs = self.random_points_from_reference( | |
n=random_n, | |
ref_kpts=sel_kpts[valid_sel_ids], | |
ref_segs=sel_seg_ids[valid_sel_ids], | |
ref_xyzs=sel_xyzs[valid_sel_ids], | |
radius=5, | |
) | |
# sel_descs = np.vstack([sel_descs, random_descs]) | |
sel_scores = np.hstack([sel_scores, random_scores]) | |
sel_kpts = np.vstack([sel_kpts, random_kpts]) | |
sel_seg_ids = np.hstack([sel_seg_ids, random_seg_ids]) | |
sel_xyzs = np.vstack([sel_xyzs, random_xyzs]) | |
gt_n_seg = np.zeros(shape=(self.n_class,), dtype=int) | |
gt_cls = np.zeros(shape=(self.n_class,), dtype=int) | |
gt_cls_dist = np.zeros(shape=(self.n_class,), dtype=float) | |
uids = np.unique(sel_seg_ids).tolist() | |
for uid in uids: | |
if uid == 0: | |
continue | |
gt_cls[uid] = 1 | |
gt_n_seg[uid] = np.sum(sel_seg_ids == uid) | |
gt_cls_dist[uid] = np.sum(seg_ids == uid) / np.sum(seg_ids > 0) # [valid_id / total_valid_id] | |
param_out = self.extract_intrinsic_extrinsic_params(image_id=self.name_to_id[img_name]) | |
img = self.read_image(image_name=img_name) | |
image_size = img.shape[:2] | |
if self.image_dim == 1: | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
else: | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
if self.with_aug: | |
nh = img.shape[0] | |
nw = img.shape[1] | |
if self.scale_params is not None: | |
do_scale = np.random.random() | |
if do_scale <= 0.25: | |
p = np.random.randint(0, 11) | |
s = self.scale_params[0] + (self.scale_params[1] - self.scale_params[0]) / 10 * p | |
nh = int(img.shape[0] * s) | |
nw = int(img.shape[1] * s) | |
sh = nh / img.shape[0] | |
sw = nw / img.shape[1] | |
sel_kpts[:, 0] = sel_kpts[:, 0] * sw | |
sel_kpts[:, 1] = sel_kpts[:, 1] * sh | |
img = cv2.resize(img, dsize=(nw, nh)) | |
brightness = np.random.uniform(-self.jitter_params['brightness'], self.jitter_params['brightness']) * 255 | |
contrast = 1 + np.random.uniform(-self.jitter_params['contrast'], self.jitter_params['contrast']) | |
img = cv2.addWeighted(img, contrast, img, 0, brightness) | |
img = np.clip(img, a_min=0, a_max=255) | |
if self.image_dim == 1: | |
img = img[..., None] | |
img = img.astype(float) / 255. | |
image_size = np.array([nh, nw], dtype=int) | |
else: | |
if self.image_dim == 1: | |
img = img[..., None].astype(float) / 255. | |
output = { | |
# 'descriptors': sel_descs, # may not be used | |
'scores': sel_scores, | |
'keypoints': sel_kpts, | |
'norm_keypoints': normalize_size(x=sel_kpts, size=image_size), | |
'image': [img], | |
'gt_seg': sel_seg_ids, | |
'gt_cls': gt_cls, | |
'gt_cls_dist': gt_cls_dist, | |
'gt_n_seg': gt_n_seg, | |
'file_name': img_name, | |
'prefix_name': self.image_prefix, | |
# 'mean_xyz': self.mean_xyz, | |
# 'scale_xyz': self.scale_xyz, | |
# 'gt_sc': sel_xyzs, | |
# 'gt_norm_sc': (sel_xyzs - self.mean_xyz) / self.scale_xyz, | |
'K': param_out['K'], | |
'gt_P': param_out['P'] | |
} | |
return output | |
def get_item_test(self, idx): | |
# evaluation of recognition only | |
img_name = self.img_fns[idx] | |
feat_data = np.load(osp.join(self.feature_dir, img_name.replace('/', '+') + '.npy'), allow_pickle=True)[()] | |
descs = feat_data['descriptors'] # [N, D] | |
scores = feat_data['scores'] # [N, 1] | |
kpts = feat_data['keypoints'] # [N, 2] | |
image_size = feat_data['image_size'] | |
nfeat = descs.shape[0] | |
if img_name in self.img_p3d.keys(): | |
p3d_ids = self.img_p3d[img_name] | |
p3d_xyzs = np.zeros(shape=(nfeat, 3), dtype=float) | |
seg_ids = np.zeros(shape=(nfeat,), dtype=int) # attention! by default invalid!!! | |
for i in range(nfeat): | |
p3d = p3d_ids[i] | |
if p3d in self.p3d_seg.keys(): | |
seg_ids[i] = self.p3d_seg[p3d] + 1 | |
if seg_ids[i] == -1: | |
seg_ids[i] = 0 # 0 for in valid | |
if p3d in self.p3d_xyzs.keys(): | |
p3d_xyzs[i] = self.p3d_xyzs[p3d] | |
seg_ids = np.array(seg_ids).reshape(-1, ) | |
if self.nfeatures > 0: | |
sorted_ids = np.argsort(scores)[::-1][:self.nfeatures] # large to small | |
descs = descs[sorted_ids] | |
scores = scores[sorted_ids] | |
kpts = kpts[sorted_ids] | |
p3d_xyzs = p3d_xyzs[sorted_ids] | |
seg_ids = seg_ids[sorted_ids] | |
gt_n_seg = np.zeros(shape=(self.n_class,), dtype=int) | |
gt_cls = np.zeros(shape=(self.n_class,), dtype=int) | |
gt_cls_dist = np.zeros(shape=(self.n_class,), dtype=float) | |
uids = np.unique(seg_ids).tolist() | |
for uid in uids: | |
if uid == 0: | |
continue | |
gt_cls[uid] = 1 | |
gt_n_seg[uid] = np.sum(seg_ids == uid) | |
gt_cls_dist[uid] = np.sum(seg_ids == uid) / np.sum( | |
seg_ids < self.n_class - 1) # [valid_id / total_valid_id] | |
gt_cls[0] = 0 | |
img = self.read_image(image_name=img_name) | |
if self.image_dim == 1: | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
img = img[..., None].astype(float) / 255. | |
else: | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(float) / 255. | |
return { | |
'descriptors': descs, | |
'scores': scores, | |
'keypoints': kpts, | |
'image_size': image_size, | |
'norm_keypoints': normalize_size(x=kpts, size=image_size), | |
'gt_seg': seg_ids, | |
'gt_cls': gt_cls, | |
'gt_cls_dist': gt_cls_dist, | |
'gt_n_seg': gt_n_seg, | |
'file_name': img_name, | |
'prefix_name': self.image_prefix, | |
'image': [img], | |
'mean_xyz': self.mean_xyz, | |
'scale_xyz': self.scale_xyz, | |
'gt_sc': p3d_xyzs, | |
'gt_norm_sc': (p3d_xyzs - self.mean_xyz) / self.scale_xyz | |
} | |
def __getitem__(self, idx): | |
if self.train: | |
return self.get_item_train(idx=idx) | |
else: | |
return self.get_item_test(idx=idx) | |
def __len__(self): | |
return len(self.img_fns) | |
def read_image(self, image_name): | |
return cv2.imread(osp.join(self.dataset_path, image_name)) | |
def jitter_augmentation(self, img, params): | |
brightness, contrast, saturation, hue = params | |
p = np.random.randint(0, 20) / 20 | |
b = brightness[0] + (brightness[1] - brightness[0]) / 20 * p | |
img = tvf.adjust_brightness(img=img, brightness_factor=b) | |
p = np.random.randint(0, 20) / 20 | |
c = contrast[0] + (contrast[1] - contrast[0]) / 20 * p | |
img = tvf.adjust_contrast(img=img, contrast_factor=c) | |
p = np.random.randint(0, 20) / 20 | |
s = saturation[0] + (saturation[1] - saturation[0]) / 20 * p | |
img = tvf.adjust_saturation(img=img, saturation_factor=s) | |
p = np.random.randint(0, 20) / 20 | |
h = hue[0] + (hue[1] - hue[0]) / 20 * p | |
img = tvf.adjust_hue(img=img, hue_factor=h) | |
return img | |
def random_points(self, n, d, h, w): | |
desc = np.random.random((n, d)) | |
desc = desc / np.linalg.norm(desc, ord=2, axis=1)[..., None] | |
xs = np.random.randint(0, w - 1, size=(n, 1)) | |
ys = np.random.randint(0, h - 1, size=(n, 1)) | |
kpts = np.hstack([xs, ys]) | |
return desc, kpts | |
def random_points_from_reference(self, n, ref_kpts, ref_segs, ref_xyzs, radius=5): | |
n_ref = ref_kpts.shape[0] | |
if n_ref < n: | |
ref_ids = np.random.choice([i for i in range(n_ref)], size=n).tolist() | |
else: | |
ref_ids = [i for i in range(n)] | |
new_xs = [] | |
new_ys = [] | |
# new_descs = [] | |
new_segs = [] | |
new_xyzs = [] | |
for i in ref_ids: | |
nx = np.random.randint(-radius, radius) + ref_kpts[i, 0] | |
ny = np.random.randint(-radius, radius) + ref_kpts[i, 1] | |
new_xs.append(nx) | |
new_ys.append(ny) | |
# new_descs.append(ref_descs[i]) | |
new_segs.append(ref_segs[i]) | |
new_xyzs.append(ref_xyzs[i]) | |
new_xs = np.array(new_xs).reshape(n, 1) | |
new_ys = np.array(new_ys).reshape(n, 1) | |
new_segs = np.array(new_segs).reshape(n, ) | |
new_kpts = np.hstack([new_xs, new_ys]) | |
# new_descs = np.array(new_descs).reshape(n, -1) | |
new_xyzs = np.array(new_xyzs) | |
return new_kpts, new_segs, new_xyzs | |