commited on
Create geometry.py
Browse files- geometry.py +354 -0
@@ -0,0 +1,354 @@
1 |
import numpy as np
2 |
import torch
3 |
import time
4 |
import imageio
5 |
from skimage.draw import line
6 |
from easydict import EasyDict as edict
7 |
8 |
from pytorch3d.renderer import NDCMultinomialRaysampler, ray_bundle_to_ray_points
9 |
from pytorch3d.utils import cameras_from_opencv_projection
10 |
from einops import rearrange
11 |
12 |
from torch.nn import functional as F
13 |
14 |
# cache for fast epipolar line drawing
15 |
16 |
masks32 = np.load("/fs01/home/yashkant/spad-code/cache/masks32.npy", allow_pickle=True)
17 |
18 |
print(f"failed to load cache for fast epipolar line drawing, this does not affect final results")
19 |
masks32 = None
20 |
21 |
22 |
def compute_epipolar_mask(src_frame, tgt_frame, imh, imw, dialate_mask=True, debug_depth=False, visualize_mask=False):
23 |
24 |
src_frame: source frame containing camera
25 |
tgt_frame: target frame containing camera
26 |
debug_depth: if True, uses depth map to compute epipolar lines on target image (debugging)
27 |
visualize_mask: if True, saves a batched attention masks (debugging)
28 |
29 |
30 |
# generates raybundle using camera intrinsics and extrinsics
31 |
src_ray_bundle = NDCMultinomialRaysampler(
32 |
33 |
34 |
35 |
36 |
37 |
38 |
39 |
src_depth = getattr(src_frame, "depth_map", None)
40 |
if debug_depth and src_depth is not None:
41 |
src_depth = src_depth[:, 0, ..., None]
42 |
src_depth[src_depth >= 100] = 100 # clip depth
43 |
44 |
# get points in world space (at fixed depth)
45 |
src_depth = 3.5 * torch.ones((1, imh, imw, 1), dtype=torch.float32, device=src_frame.camera.device)
46 |
47 |
pts_world = ray_bundle_to_ray_points(
48 |
49 |
50 |
# print(f"world points bounds: {pts_world.reshape(-1,3).min(dim=0)[0]} to {pts_world.reshape(-1,3).max(dim=0)[0]}")
51 |
rays_time = time.time()
52 |
53 |
# move source points to target screen space
54 |
tgt_pts_screen = tgt_frame.camera.transform_points_screen(pts_world.squeeze(), image_size=(imh, imw))
55 |
56 |
# move source camera center to target screen space
57 |
src_center_tgt_screen = tgt_frame.camera.transform_points_screen(src_frame.camera.get_camera_center(), image_size=(imh, imw)).squeeze()
58 |
59 |
# build epipolar mask (draw lines from source camera center to source points in target screen space)
60 |
# start: source camera center, end: source points in target screen space
61 |
62 |
# get flow of points
63 |
center_to_pts_flow = tgt_pts_screen[...,:2] - src_center_tgt_screen[...,:2]
64 |
65 |
# normalize flow
66 |
center_to_pts_flow = center_to_pts_flow / center_to_pts_flow.norm(dim=-1, keepdim=True)
67 |
68 |
# get slope and intercept of lines
69 |
slope = center_to_pts_flow[:,:,0:1] / center_to_pts_flow[:,:,1:2]
70 |
intercept = tgt_pts_screen[:,:, 0:1] - slope * tgt_pts_screen[:,:, 1:2]
71 |
72 |
# find intersection of lines with tgt screen (x = 0, x = imw, y = 0, y = imh)
73 |
left = slope * 0 + intercept
74 |
left_sane = (left <= imh) & (0 <= left)
75 |
left = torch.cat([left, torch.zeros_like(left)], dim=-1)
76 |
77 |
right = slope * imw + intercept
78 |
right_sane = (right <= imh) & (0 <= right)
79 |
right = torch.cat([right, torch.ones_like(right) * imw], dim=-1)
80 |
81 |
top = (0 - intercept) / slope
82 |
top_sane = (top <= imw) & (0 <= top)
83 |
top = torch.cat([torch.zeros_like(top), top], dim=-1)
84 |
85 |
bottom = (imh - intercept) / slope
86 |
bottom_sane = (bottom <= imw) & (0 <= bottom)
87 |
bottom = torch.cat([torch.ones_like(bottom) * imh, bottom], dim=-1)
88 |
89 |
# find intersection of lines
90 |
points_one = torch.zeros_like(left)
91 |
points_two = torch.zeros_like(left)
92 |
93 |
# collect points from [left, right, bottom, top] in sequence
94 |
points_one = torch.where(left_sane.repeat(1,1,2), left, points_one)
95 |
96 |
points_one_zero = (points_one.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
97 |
points_one = torch.where(right_sane.repeat(1,1,2) & points_one_zero, right, points_one)
98 |
99 |
points_one_zero = (points_one.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
100 |
points_one = torch.where(bottom_sane.repeat(1,1,2) & points_one_zero, bottom, points_one)
101 |
102 |
points_one_zero = (points_one.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
103 |
points_one = torch.where(top_sane.repeat(1,1,2) & points_one_zero, top, points_one)
104 |
105 |
# collect points from [top, bottom, right, left] in sequence (opposite)
106 |
points_two = torch.where(top_sane.repeat(1,1,2), top, points_two)
107 |
108 |
points_two_zero = (points_two.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
109 |
points_two = torch.where(bottom_sane.repeat(1,1,2) & points_two_zero, bottom, points_two)
110 |
111 |
points_two_zero = (points_two.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
112 |
points_two = torch.where(right_sane.repeat(1,1,2) & points_two_zero, right, points_two)
113 |
114 |
points_two_zero = (points_two.sum(dim=-1) == 0).unsqueeze(-1).repeat(1,1,2)
115 |
points_two = torch.where(left_sane.repeat(1,1,2) & points_two_zero, left, points_two)
116 |
117 |
# if source point lies inside target screen (find only one intersection)
118 |
if (imh >= src_center_tgt_screen[0] >= 0) and (imw >= src_center_tgt_screen[1] >= 0):
119 |
points_one_flow = points_one - src_center_tgt_screen[:2]
120 |
points_one_flow_direction = (points_one_flow > 0)
121 |
122 |
points_two_flow = points_two - src_center_tgt_screen[:2]
123 |
points_two_flow_direction = (points_two_flow > 0)
124 |
125 |
orig_flow_direction = (center_to_pts_flow > 0)
126 |
127 |
# if flow direction is same as orig flow direction, pick points_one, else points_two
128 |
points_one_alinged = (points_one_flow_direction == orig_flow_direction).all(dim=-1).unsqueeze(-1).repeat(1,1,2)
129 |
points_one = torch.where(points_one_alinged, points_one, points_two)
130 |
131 |
# points two is source camera center
132 |
points_two = points_two * 0 + src_center_tgt_screen[:2]
133 |
134 |
# if debug terminate with depth
135 |
if debug_depth:
136 |
# remove points that are out of bounds (in target screen space)
137 |
tgt_pts_screen_mask = (tgt_pts_screen[...,:2] < 0) | (tgt_pts_screen[...,:2] > imh)
138 |
tgt_pts_screen_mask = ~tgt_pts_screen_mask.any(dim=-1, keepdim=True)
139 |
140 |
depth_dist = torch.norm(src_center_tgt_screen[:2] - tgt_pts_screen[...,:2], dim=-1, keepdim=True)
141 |
points_one_dist = torch.norm(src_center_tgt_screen[:2] - points_one, dim=-1, keepdim=True)
142 |
points_two_dist = torch.norm(src_center_tgt_screen[:2] - points_two, dim=-1, keepdim=True)
143 |
144 |
# replace where reprojected point is closer to source camera on target screen
145 |
points_one = torch.where((depth_dist < points_one_dist) & tgt_pts_screen_mask, tgt_pts_screen[...,:2], points_one)
146 |
points_two = torch.where((depth_dist < points_two_dist) & tgt_pts_screen_mask, tgt_pts_screen[...,:2], points_two)
147 |
148 |
# build epipolar mask
149 |
attention_mask = torch.zeros((imh * imw, imh, imw), dtype=torch.bool, device=src_frame.camera.device)
150 |
151 |
# quantize points to pixel indices
152 |
points_one = (points_one - 0.5).reshape(-1,2).long().numpy()
153 |
points_two = (points_two - 0.5).reshape(-1,2).long().numpy()
154 |
155 |
# cache only supports 32x32 epipolar mask with 3x3 dilation
156 |
if not (imh == 32 and imw == 32) or not dialate_mask or masks32 is None:
157 |
# iterate over points_one and points_two together and draw lines
158 |
for idx, (p1, p2) in enumerate(zip(points_one, points_two)):
159 |
# skip out of bounds points
160 |
if p1.sum() == 0 and p2.sum() == 0:
161 |
162 |
163 |
if not dialate_mask:
164 |
# draw line from p1 to p2
165 |
rr, cc = line(int(p1[1]), int(p1[0]), int(p2[1]), int(p2[0]), use_cache=False)
166 |
rr, cc = rr.astype(np.int32), cc.astype(np.int32)
167 |
attention_mask[idx, rr, cc] = True
168 |
169 |
# draw lines with mask dilation (from all neighbors of p1 to neighbors of p2)
170 |
rrs, ccs = [], []
171 |
for dx, dy in [(0,0), (0,1), (1,1), (1,0), (1,-1), (0,-1), (-1,-1), (-1,0), (-1,1)]: # 8 neighbors
172 |
_p1 = [min(max(p1[0] + dy, 0), imh - 1), min(max(p1[1] + dx, 0), imw - 1)]
173 |
_p2 = [min(max(p2[0] + dy, 0), imh - 1), min(max(p2[1] + dx, 0), imw - 1)]
174 |
rr, cc = line(int(_p1[1]), int(_p1[0]), int(_p2[1]), int(_p2[0]))
175 |
rrs.append(rr); ccs.append(cc)
176 |
rrs, ccs = np.concatenate(rrs), np.concatenate(ccs)
177 |
attention_mask[idx, rrs.astype(np.int32), ccs.astype(np.int32)] = True
178 |
179 |
points_one_y, points_one_x = points_one[:,0], points_one[:,1]
180 |
points_two_y, points_two_x = points_two[:,0], points_two[:,1]
181 |
attention_mask = masks32[points_one_y, points_one_x, points_two_y, points_two_x]
182 |
attention_mask = torch.from_numpy(attention_mask).to(src_frame.camera.device)
183 |
184 |
# reshape to (imh, imw, imh, imw)
185 |
attention_mask = attention_mask.reshape(imh * imw, imh * imw)
186 |
187 |
# stores flattened 2D attention mask
188 |
if visualize_mask:
189 |
attention_mask = attention_mask.reshape(imh * imw, imh * imw)
190 |
am_img = (attention_mask.squeeze().unsqueeze(-1).repeat(1,1,3).float().numpy() * 255).astype(np.uint8)
191 |
imageio.imsave("data/visuals/epipolar_masks/batched_mask.png", am_img)
192 |
193 |
return attention_mask
194 |
195 |
196 |
def get_opencv_from_blender(matrix_world, fov, image_size):
197 |
# convert matrix_world to opencv format extrinsics
198 |
opencv_world_to_cam = matrix_world.inverse()
199 |
opencv_world_to_cam[1, :] *= -1
200 |
opencv_world_to_cam[2, :] *= -1
201 |
R, T = opencv_world_to_cam[:3, :3], opencv_world_to_cam[:3, 3]
202 |
R, T = R.unsqueeze(0), T.unsqueeze(0)
203 |
204 |
# convert fov to opencv format intrinsics
205 |
focal = 1 / np.tan(fov / 2)
206 |
intrinsics = np.diag(np.array([focal, focal, 1])).astype(np.float32)
207 |
opencv_cam_matrix = torch.from_numpy(intrinsics).unsqueeze(0).float()
208 |
opencv_cam_matrix[:, :2, -1] += torch.tensor([image_size / 2, image_size / 2])
209 |
opencv_cam_matrix[:, [0,1], [0,1]] *= image_size / 2
210 |
211 |
return R, T, opencv_cam_matrix
212 |
213 |
214 |
def compute_plucker_embed(frame, imw, imh):
215 |
""" Computes Plucker coordinates for a Pytorch3D camera. """
216 |
217 |
# get camera center
218 |
cam_pos = frame.camera.get_camera_center()
219 |
220 |
# get ray bundle
221 |
src_ray_bundle = NDCMultinomialRaysampler(
222 |
223 |
224 |
225 |
226 |
227 |
228 |
229 |
# get ray directions
230 |
ray_dirs = F.normalize(src_ray_bundle.directions, dim=-1)
231 |
232 |
# get plucker coordinates
233 |
cross = torch.cross(cam_pos[:,None,None,:], ray_dirs, dim=-1)
234 |
plucker = torch.cat((ray_dirs, cross), dim=-1)
235 |
plucker = plucker.permute(0, 3, 1, 2)
236 |
237 |
return plucker # (B, 6, H, W, )
238 |
239 |
240 |
def cartesian_to_spherical(xyz):
241 |
xy = xyz[:,0]**2 + xyz[:,1]**2
242 |
z = np.sqrt(xy + xyz[:,2]**2)
243 |
theta = np.arctan2(np.sqrt(xy), xyz[:,2]) # for elevation angle defined from z-axis down
244 |
azimuth = np.arctan2(xyz[:,1], xyz[:,0])
245 |
return np.stack([theta, azimuth, z], axis=-1)
246 |
247 |
248 |
def spherical_to_cartesian(spherical_coords):
249 |
# convert from spherical to cartesian coordinates
250 |
theta, azimuth, radius = spherical_coords.T
251 |
x = radius * np.sin(theta) * np.cos(azimuth)
252 |
y = radius * np.sin(theta) * np.sin(azimuth)
253 |
z = radius * np.cos(theta)
254 |
return np.stack([x, y, z], axis=-1)
255 |
256 |
257 |
def look_at(eye, center, up):
258 |
# Create a normalized direction vector from eye to center
259 |
f = np.array(center) - np.array(eye)
260 |
f /= np.linalg.norm(f)
261 |
262 |
# Create a normalized right vector
263 |
up_norm = np.array(up) / np.linalg.norm(up)
264 |
s = np.cross(f, up_norm)
265 |
s /= np.linalg.norm(s)
266 |
267 |
# Recompute the up vector
268 |
u = np.cross(s, f)
269 |
270 |
# Create rotation matrix R
271 |
R = np.array([[s[0], s[1], s[2]],
272 |
[u[0], u[1], u[2]],
273 |
[-f[0], -f[1], -f[2]]])
274 |
275 |
# Create translation vector T
276 |
T = -np.dot(R, np.array(eye))
277 |
278 |
return R, T
279 |
280 |
281 |
def get_blender_from_spherical(elevation, azimuth):
282 |
""" Generates blender camera from spherical coordinates. """
283 |
284 |
cartesian_coords = spherical_to_cartesian(np.array([[elevation, azimuth, 3.5]]))
285 |
286 |
# get camera rotation
287 |
center = np.array([0, 0, 0])
288 |
eye = cartesian_coords[0]
289 |
up = np.array([0, 0, 1])
290 |
291 |
R, T = look_at(eye, center, up)
292 |
R = R.T; T = -np.dot(R, T)
293 |
RT = np.concatenate([R, T.reshape(3,1)], axis=-1)
294 |
295 |
blender_cam = torch.from_numpy(RT).float()
296 |
blender_cam = torch.cat([blender_cam, torch.tensor([[0, 0, 0, 1]])], axis=0)
297 |
return blender_cam
298 |
299 |
300 |
def get_mask_and_plucker(src_frame, tgt_frame, image_size, dialate_mask=True, debug_depth=False, visualize_mask=False):
301 |
""" Given a pair of source and target frames (blender outputs), returns the epipolar attention masks and plucker embeddings."""
302 |
303 |
# get pytorch3d frames (blender to opencv, then opencv to pytorch3d)
304 |
src_R, src_T, src_intrinsics = get_opencv_from_blender(src_frame["camera"], src_frame["fov"], image_size)
305 |
src_camera_pytorch3d = cameras_from_opencv_projection(src_R, src_T, src_intrinsics, torch.tensor([image_size, image_size]).float().unsqueeze(0))
306 |
src_frame.update({"camera": src_camera_pytorch3d})
307 |
308 |
tgt_R, tgt_T, tgt_intrinsics = get_opencv_from_blender(tgt_frame["camera"], tgt_frame["fov"], image_size)
309 |
tgt_camera_pytorch3d = cameras_from_opencv_projection(tgt_R, tgt_T, tgt_intrinsics, torch.tensor([image_size, image_size]).float().unsqueeze(0))
310 |
tgt_frame.update({"camera": tgt_camera_pytorch3d})
311 |
312 |
# compute epipolar masks
313 |
image_height, image_width = image_size, image_size
314 |
src_mask = compute_epipolar_mask(src_frame, tgt_frame, image_height, image_width, dialate_mask, debug_depth, visualize_mask)
315 |
tgt_mask = compute_epipolar_mask(tgt_frame, src_frame, image_height, image_width, dialate_mask, debug_depth, visualize_mask)
316 |
317 |
# compute plucker coordinates
318 |
src_plucker = compute_plucker_embed(src_frame, image_height, image_width).squeeze()
319 |
tgt_plucker = compute_plucker_embed(tgt_frame, image_height, image_width).squeeze()
320 |
321 |
return src_mask, tgt_mask, src_plucker, tgt_plucker
322 |
323 |
324 |
def get_batch_from_spherical(elevations, azimuths, fov=0.702769935131073, image_size=256):
325 |
"""Given a list of elevations and azimuths, generates cameras, computes epipolar masks and plucker embeddings and organizes them as a batch."""
326 |
327 |
num_views = len(elevations)
328 |
latent_size = image_size // 8
329 |
assert len(elevations) == len(azimuths)
330 |
331 |
# intialize all epipolar masks to ones (i.e. all pixels are considered)
332 |
batch_attention_masks = torch.ones(num_views, num_views, latent_size ** 2, latent_size ** 2, dtype=torch.bool)
333 |
plucker_embeds = [None for _ in range(num_views)]
334 |
335 |
# compute pairwise mask and plucker
336 |
for i, icam in enumerate(zip(elevations, azimuths)):
337 |
for j, jcam in enumerate(zip(elevations, azimuths)):
338 |
if i == j: continue
339 |
340 |
first_frame = edict({"fov": fov}); second_frame = edict({"fov": fov})
341 |
first_frame["camera"] = get_blender_from_spherical(elevation=icam[0], azimuth=icam[1])
342 |
second_frame["camera"] = get_blender_from_spherical(elevation=jcam[0], azimuth=jcam[1])
343 |
first_mask, second_mask, first_plucker, second_plucker = get_mask_and_plucker(first_frame, second_frame, latent_size, dialate_mask=True)
344 |
345 |
batch_attention_masks[i, j], batch_attention_masks[j, i] = first_mask, second_mask
346 |
plucker_embeds[i], plucker_embeds[j] = first_plucker, second_plucker
347 |
348 |
# organize as batch
349 |
batch = {}
350 |
batch_attention_masks = rearrange(batch_attention_masks, 'b1 b2 h w -> (b1 h) (b2 w)')
351 |
batch["epi_constraint_masks"] = batch_attention_masks
352 |
batch["plucker_embeds"] = torch.stack(plucker_embeds)
353 |
354 |
return batch