Spaces:
Running
Running
""" | |
This file implements the homographic transforms for data augmentation. | |
Code adapted from https://github.com/rpautrat/SuperPoint | |
""" | |
import numpy as np | |
from math import pi | |
from ..synthetic_util import get_line_map, get_line_heatmap | |
import cv2 | |
import copy | |
import shapely.geometry | |
def sample_homography( | |
shape, | |
perspective=True, | |
scaling=True, | |
rotation=True, | |
translation=True, | |
n_scales=5, | |
n_angles=25, | |
scaling_amplitude=0.1, | |
perspective_amplitude_x=0.1, | |
perspective_amplitude_y=0.1, | |
patch_ratio=0.5, | |
max_angle=pi / 2, | |
allow_artifacts=False, | |
translation_overflow=0.0, | |
): | |
""" | |
Computes the homography transformation between a random patch in the | |
original image and a warped projection with the same image size. | |
As in `tf.contrib.image.transform`, it maps the output point | |
(warped patch) to a transformed input point (original patch). | |
The original patch, initialized with a simple half-size centered crop, | |
is iteratively projected, scaled, rotated and translated. | |
Arguments: | |
shape: A rank-2 `Tensor` specifying the height and width of the original image. | |
perspective: A boolean that enables the perspective and affine transformations. | |
scaling: A boolean that enables the random scaling of the patch. | |
rotation: A boolean that enables the random rotation of the patch. | |
translation: A boolean that enables the random translation of the patch. | |
n_scales: The number of tentative scales that are sampled when scaling. | |
n_angles: The number of tentatives angles that are sampled when rotating. | |
scaling_amplitude: Controls the amount of scale. | |
perspective_amplitude_x: Controls the perspective effect in x direction. | |
perspective_amplitude_y: Controls the perspective effect in y direction. | |
patch_ratio: Controls the size of the patches used to create the homography. | |
max_angle: Maximum angle used in rotations. | |
allow_artifacts: A boolean that enables artifacts when applying the homography. | |
translation_overflow: Amount of border artifacts caused by translation. | |
Returns: | |
homo_mat: A numpy array of shape `[1, 3, 3]` corresponding to the | |
homography transform. | |
selected_scale: The selected scaling factor. | |
""" | |
# Convert shape to ndarry | |
if not isinstance(shape, np.ndarray): | |
shape = np.array(shape) | |
# Corners of the output image | |
pts1 = np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 1.0], [1.0, 0.0]]) | |
# Corners of the input patch | |
margin = (1 - patch_ratio) / 2 | |
pts2 = margin + np.array( | |
[[0, 0], [0, patch_ratio], [patch_ratio, patch_ratio], [patch_ratio, 0]] | |
) | |
# Random perspective and affine perturbations | |
if perspective: | |
if not allow_artifacts: | |
perspective_amplitude_x = min(perspective_amplitude_x, margin) | |
perspective_amplitude_y = min(perspective_amplitude_y, margin) | |
# normal distribution with mean=0, std=perspective_amplitude_y/2 | |
perspective_displacement = np.random.normal( | |
0.0, perspective_amplitude_y / 2, [1] | |
) | |
h_displacement_left = np.random.normal(0.0, perspective_amplitude_x / 2, [1]) | |
h_displacement_right = np.random.normal(0.0, perspective_amplitude_x / 2, [1]) | |
pts2 += np.stack( | |
[ | |
np.concatenate([h_displacement_left, perspective_displacement], 0), | |
np.concatenate([h_displacement_left, -perspective_displacement], 0), | |
np.concatenate([h_displacement_right, perspective_displacement], 0), | |
np.concatenate([h_displacement_right, -perspective_displacement], 0), | |
] | |
) | |
# Random scaling: sample several scales, check collision with borders, | |
# randomly pick a valid one | |
if scaling: | |
scales = np.concatenate( | |
[[1.0], np.random.normal(1, scaling_amplitude / 2, [n_scales])], 0 | |
) | |
center = np.mean(pts2, axis=0, keepdims=True) | |
scaled = (pts2 - center)[None, ...] * scales[..., None, None] + center | |
# all scales are valid except scale=1 | |
if allow_artifacts: | |
valid = np.array(range(n_scales)) | |
# Chech the valid scale | |
else: | |
valid = np.where(np.all((scaled >= 0.0) & (scaled < 1.0), (1, 2)))[0] | |
# No valid scale found => recursively call | |
if valid.shape[0] == 0: | |
return sample_homography( | |
shape, | |
perspective, | |
scaling, | |
rotation, | |
translation, | |
n_scales, | |
n_angles, | |
scaling_amplitude, | |
perspective_amplitude_x, | |
perspective_amplitude_y, | |
patch_ratio, | |
max_angle, | |
allow_artifacts, | |
translation_overflow, | |
) | |
idx = valid[np.random.uniform(0.0, valid.shape[0], ()).astype(np.int32)] | |
pts2 = scaled[idx] | |
# Additionally save and return the selected scale. | |
selected_scale = scales[idx] | |
# Random translation | |
if translation: | |
t_min, t_max = np.min(pts2, axis=0), np.min(1 - pts2, axis=0) | |
if allow_artifacts: | |
t_min += translation_overflow | |
t_max += translation_overflow | |
pts2 += ( | |
np.stack( | |
[ | |
np.random.uniform(-t_min[0], t_max[0], ()), | |
np.random.uniform(-t_min[1], t_max[1], ()), | |
] | |
) | |
)[None, ...] | |
# Random rotation: sample several rotations, check collision with borders, | |
# randomly pick a valid one | |
if rotation: | |
angles = np.linspace(-max_angle, max_angle, n_angles) | |
# in case no rotation is valid | |
angles = np.concatenate([[0.0], angles], axis=0) | |
center = np.mean(pts2, axis=0, keepdims=True) | |
rot_mat = np.reshape( | |
np.stack( | |
[np.cos(angles), -np.sin(angles), np.sin(angles), np.cos(angles)], | |
axis=1, | |
), | |
[-1, 2, 2], | |
) | |
rotated = ( | |
np.matmul( | |
np.tile((pts2 - center)[None, ...], [n_angles + 1, 1, 1]), rot_mat | |
) | |
+ center | |
) | |
if allow_artifacts: | |
# All angles are valid, except angle=0 | |
valid = np.array(range(n_angles)) | |
else: | |
valid = np.where(np.all((rotated >= 0.0) & (rotated < 1.0), axis=(1, 2)))[0] | |
if valid.shape[0] == 0: | |
return sample_homography( | |
shape, | |
perspective, | |
scaling, | |
rotation, | |
translation, | |
n_scales, | |
n_angles, | |
scaling_amplitude, | |
perspective_amplitude_x, | |
perspective_amplitude_y, | |
patch_ratio, | |
max_angle, | |
allow_artifacts, | |
translation_overflow, | |
) | |
idx = valid[np.random.uniform(0.0, valid.shape[0], ()).astype(np.int32)] | |
pts2 = rotated[idx] | |
# Rescale to actual size | |
shape = shape[::-1].astype(np.float32) # different convention [y, x] | |
pts1 *= shape[None, ...] | |
pts2 *= shape[None, ...] | |
def ax(p, q): | |
return [p[0], p[1], 1, 0, 0, 0, -p[0] * q[0], -p[1] * q[0]] | |
def ay(p, q): | |
return [0, 0, 0, p[0], p[1], 1, -p[0] * q[1], -p[1] * q[1]] | |
a_mat = np.stack([f(pts1[i], pts2[i]) for i in range(4) for f in (ax, ay)], axis=0) | |
p_mat = np.transpose( | |
np.stack([[pts2[i][j] for i in range(4) for j in range(2)]], axis=0) | |
) | |
homo_vec, _, _, _ = np.linalg.lstsq(a_mat, p_mat, rcond=None) | |
# Compose the homography vector back to matrix | |
homo_mat = np.concatenate( | |
[ | |
homo_vec[0:3, 0][None, ...], | |
homo_vec[3:6, 0][None, ...], | |
np.concatenate((homo_vec[6], homo_vec[7], [1]), axis=0)[None, ...], | |
], | |
axis=0, | |
) | |
return homo_mat, selected_scale | |
def convert_to_line_segments(junctions, line_map): | |
"""Convert junctions and line map to line segments.""" | |
# Copy the line map | |
line_map_tmp = copy.copy(line_map) | |
line_segments = np.zeros([0, 4]) | |
for idx in range(junctions.shape[0]): | |
# If no connectivity, just skip it | |
if line_map_tmp[idx, :].sum() == 0: | |
continue | |
# Record the line segment | |
else: | |
for idx2 in np.where(line_map_tmp[idx, :] == 1)[0]: | |
p1 = junctions[idx, :] | |
p2 = junctions[idx2, :] | |
line_segments = np.concatenate( | |
(line_segments, np.array([p1[0], p1[1], p2[0], p2[1]])[None, ...]), | |
axis=0, | |
) | |
# Update line_map | |
line_map_tmp[idx, idx2] = 0 | |
line_map_tmp[idx2, idx] = 0 | |
return line_segments | |
def compute_valid_mask(image_size, homography, border_margin, valid_mask=None): | |
# Warp the mask | |
if valid_mask is None: | |
initial_mask = np.ones(image_size) | |
else: | |
initial_mask = valid_mask | |
mask = cv2.warpPerspective( | |
initial_mask, | |
homography, | |
(image_size[1], image_size[0]), | |
flags=cv2.INTER_NEAREST, | |
) | |
# Optionally perform erosion | |
if border_margin > 0: | |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (border_margin * 2,) * 2) | |
mask = cv2.erode(mask, kernel) | |
# Perform dilation if border_margin is negative | |
if border_margin < 0: | |
kernel = cv2.getStructuringElement( | |
cv2.MORPH_ELLIPSE, (abs(int(border_margin)) * 2,) * 2 | |
) | |
mask = cv2.dilate(mask, kernel) | |
return mask | |
def warp_line_segment(line_segments, homography, image_size): | |
"""Warp the line segments using a homography.""" | |
# Separate the line segements into 2N points to apply matrix operation | |
num_segments = line_segments.shape[0] | |
junctions = np.concatenate( | |
( | |
line_segments[:, :2], # The first junction of each segment. | |
line_segments[:, 2:], | |
), # The second junction of each segment. | |
axis=0, | |
) | |
# Convert to homogeneous coordinates | |
# Flip the junctions before converting to homogeneous (xy format) | |
junctions = np.flip(junctions, axis=1) | |
junctions = np.concatenate((junctions, np.ones([2 * num_segments, 1])), axis=1) | |
warped_junctions = np.matmul(homography, junctions.T).T | |
# Convert back to segments | |
warped_junctions = warped_junctions[:, :2] / warped_junctions[:, 2:] | |
# (Convert back to hw format) | |
warped_junctions = np.flip(warped_junctions, axis=1) | |
warped_segments = np.concatenate( | |
(warped_junctions[:num_segments, :], warped_junctions[num_segments:, :]), axis=1 | |
) | |
# Check the intersections with the boundary | |
warped_segments_new = np.zeros([0, 4]) | |
image_poly = shapely.geometry.Polygon( | |
[ | |
[0, 0], | |
[image_size[1] - 1, 0], | |
[image_size[1] - 1, image_size[0] - 1], | |
[0, image_size[0] - 1], | |
] | |
) | |
for idx in range(warped_segments.shape[0]): | |
# Get the line segment | |
seg_raw = warped_segments[idx, :] # in HW format. | |
# Convert to shapely line (flip to xy format) | |
seg = shapely.geometry.LineString([np.flip(seg_raw[:2]), np.flip(seg_raw[2:])]) | |
# The line segment is just inside the image. | |
if seg.intersection(image_poly) == seg: | |
warped_segments_new = np.concatenate( | |
(warped_segments_new, seg_raw[None, ...]), axis=0 | |
) | |
# Intersect with the image. | |
elif seg.intersects(image_poly): | |
# Check intersection | |
try: | |
p = np.array(seg.intersection(image_poly).coords).reshape([-1, 4]) | |
# If intersect at exact one point, just continue. | |
except: | |
continue | |
segment = np.concatenate([np.flip(p[0, :2]), np.flip(p[0, 2:], axis=0)])[ | |
None, ... | |
] | |
warped_segments_new = np.concatenate((warped_segments_new, segment), axis=0) | |
else: | |
continue | |
warped_segments = (np.round(warped_segments_new)).astype(np.int) | |
return warped_segments | |
class homography_transform(object): | |
"""# Homography transformations.""" | |
def __init__(self, image_size, homograpy_config, border_margin=0, min_label_len=20): | |
self.homo_config = homograpy_config | |
self.image_size = image_size | |
self.target_size = (self.image_size[1], self.image_size[0]) | |
self.border_margin = border_margin | |
if (min_label_len < 1) and isinstance(min_label_len, float): | |
raise ValueError("[Error] min_label_len should be in pixels.") | |
self.min_label_len = min_label_len | |
def __call__( | |
self, input_image, junctions, line_map, valid_mask=None, homo=None, scale=None | |
): | |
# Sample one random homography or use the given one | |
if homo is None or scale is None: | |
homo, scale = sample_homography(self.image_size, **self.homo_config) | |
# Warp the image | |
warped_image = cv2.warpPerspective( | |
input_image, homo, self.target_size, flags=cv2.INTER_LINEAR | |
) | |
valid_mask = compute_valid_mask( | |
self.image_size, homo, self.border_margin, valid_mask | |
) | |
# Convert junctions and line_map back to line segments | |
line_segments = convert_to_line_segments(junctions, line_map) | |
# Warp the segments and check the length. | |
# Adjust the min_label_length | |
warped_segments = warp_line_segment(line_segments, homo, self.image_size) | |
# Convert back to junctions and line_map | |
junctions_new = np.concatenate( | |
(warped_segments[:, :2], warped_segments[:, 2:]), axis=0 | |
) | |
if junctions_new.shape[0] == 0: | |
junctions_new = np.zeros([0, 2]) | |
line_map = np.zeros([0, 0]) | |
warped_heatmap = np.zeros(self.image_size) | |
else: | |
junctions_new = np.unique(junctions_new, axis=0) | |
# Generate line map from points and segments | |
line_map = get_line_map(junctions_new, warped_segments).astype(np.int) | |
# Compute the heatmap | |
warped_heatmap = get_line_heatmap( | |
np.flip(junctions_new, axis=1), line_map, self.image_size | |
) | |
return { | |
"junctions": junctions_new, | |
"warped_image": warped_image, | |
"valid_mask": valid_mask, | |
"line_map": line_map, | |
"warped_heatmap": warped_heatmap, | |
"homo": homo, | |
"scale": scale, | |
} | |