|
import numpy as np |
|
import open3d as o3d |
|
import os |
|
import json |
|
import time |
|
import cv2 |
|
import matplotlib.pyplot as plt |
|
from shapely.geometry import Polygon |
|
from descartes.patch import PolygonPatch |
|
import io |
|
import PIL |
|
import time |
|
|
|
from misc.figures import plot_coords |
|
from misc.colors import colormap_255, semantics_cmap |
|
from visualize_3d import visualize_floorplan |
|
|
|
from DataProcessing.PointCloudReaderPanorama import PointCloudReaderPanorama |
|
|
|
|
|
class FloorRW: |
|
def __init__(self): |
|
self.dataset_path = "./s3d_raw/" |
|
self.mode = "train" |
|
self.scenes_path = os.path.join(self.dataset_path, self.mode) |
|
|
|
self.out_folder = "floor_data_with_normals" |
|
self.density_map_file_name = "density.png" |
|
self.normals_map_file_name = "normals.png" |
|
self.anno_file_name = "annotation_3d.json" |
|
self.vis_file_name = "vis.jpg" |
|
|
|
self.coco_floor_json_path = os.path.join(self.dataset_path, self.mode + "_floor.json") |
|
|
|
|
|
self.w = 256 |
|
self.h = 256 |
|
|
|
self.invalid_scenes = ["scene_00183", "scene_01155", "scene_01816"] |
|
|
|
def generate_floors(self): |
|
scenes = os.listdir(self.scenes_path) |
|
scenes.sort() |
|
|
|
|
|
|
|
for scene_ind, scene in enumerate(scenes): |
|
|
|
|
|
if scene in self.invalid_scenes: |
|
continue |
|
|
|
|
|
|
|
print("%d / %d Current scene %s" % (scene_ind + 1, len(scenes), scene)) |
|
start_time = time.time() |
|
|
|
scene_path = os.path.join(self.scenes_path, scene) |
|
|
|
|
|
reader = PointCloudReaderPanorama(scene_path, random_level=0, generate_color=True, generate_normal=False) |
|
density_map, normals_map, normalization_dict = reader.generate_density() |
|
|
|
normalized_annotations = self.normalize_annotations(scene_path, normalization_dict) |
|
|
|
|
|
|
|
|
|
self.export_scene(scene_path, density_map, normals_map, normalized_annotations) |
|
|
|
print("Scene processing time %.3f" % (time.time() - start_time)) |
|
|
|
def generate_coco_json(self): |
|
scenes = os.listdir(self.scenes_path) |
|
scenes.sort() |
|
|
|
img_id = -1 |
|
instance_id = -1 |
|
coco_dict = {} |
|
coco_dict["images"] = [] |
|
coco_dict["annotations"] = [] |
|
coco_dict["categories"] = [{"supercategory": "room", "id": 1, "name": "room"}] |
|
for scene_ind, scene in enumerate(scenes): |
|
|
|
|
|
if scene in self.invalid_scenes: |
|
continue |
|
|
|
|
|
|
|
img_id += 1 |
|
print("%d / %d Current scene %s" % (scene_ind + 1, len(scenes), scene)) |
|
|
|
scene_path = os.path.join(self.scenes_path, scene) |
|
|
|
img_relative_path = os.path.join("./", scene, self.out_folder, self.density_map_file_name) |
|
annos_path = os.path.join(scene_path, self.out_folder, self.anno_file_name) |
|
|
|
with open(annos_path, "r") as f: |
|
annos = json.load(f) |
|
|
|
img_dict = {} |
|
img_dict["file_name"] = img_relative_path |
|
img_dict["id"] = img_id |
|
img_dict["width"] = self.w |
|
img_dict["height"] = self.h |
|
|
|
coco_annotation_dict_list = self.parse_coco_annotation(annos, instance_id, img_id) |
|
|
|
coco_dict["images"].append(img_dict) |
|
coco_dict["annotations"] += coco_annotation_dict_list |
|
instance_id += len(coco_annotation_dict_list) |
|
|
|
with open(self.coco_floor_json_path, 'w') as f: |
|
json.dump(coco_dict, f) |
|
|
|
def parse_coco_annotation(self, annos, curr_instance_id, curr_img_id): |
|
polygons = visualize_floorplan(annos, vis=False, ret=True) |
|
|
|
ignore_types = ['outwall', 'door', 'window'] |
|
|
|
coco_annotation_dict_list = [] |
|
junctions = np.array([junc['coordinate'][:2] for junc in annos['junctions']]) |
|
for (poly, poly_type) in polygons: |
|
if poly_type in ignore_types: |
|
continue |
|
|
|
poly = junctions[np.array(poly)] |
|
poly_shapely = Polygon(poly) |
|
area = poly_shapely.area |
|
|
|
|
|
if area < 100: |
|
continue |
|
|
|
rectangle_shapely = poly_shapely.envelope |
|
|
|
coco_seg_poly = [] |
|
for p in poly: |
|
coco_seg_poly += list(p) |
|
|
|
|
|
bound_pad = 5 |
|
bb_x, bb_y = rectangle_shapely.exterior.xy |
|
bb_x = np.unique(bb_x) |
|
bb_y = np.unique(bb_y) |
|
bb_x_min = np.maximum(np.min(bb_x) - bound_pad, 0) |
|
bb_y_min = np.maximum(np.min(bb_y) - bound_pad, 0) |
|
|
|
bb_x_max = np.minimum(np.max(bb_x) + bound_pad, self.w - 1) |
|
bb_y_max = np.minimum(np.max(bb_y) + bound_pad, self.h - 1) |
|
|
|
bb_width = (bb_x_max - bb_x_min) |
|
bb_height = (bb_y_max - bb_y_min) |
|
|
|
coco_bb = [bb_x_min, bb_y_min, bb_width, bb_height] |
|
|
|
curr_instance_id += 1 |
|
coco_annotation_dict = { |
|
"segmentation": [coco_seg_poly], |
|
"area": area, |
|
"iscrowd": 0, |
|
"image_id": curr_img_id, |
|
"bbox": coco_bb, |
|
"category_id": 1, |
|
"id": curr_instance_id} |
|
coco_annotation_dict_list.append(coco_annotation_dict) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return coco_annotation_dict_list |
|
|
|
|
|
|
|
def export_scene(self, scene_path, density_map, normals_map, annos): |
|
def export_density(): |
|
density_path = os.path.join(floorplan_folder_path, self.density_map_file_name) |
|
density_uint8 = (density_map * 255).astype(np.uint8) |
|
cv2.imwrite(density_path, density_uint8) |
|
|
|
def export_normals(): |
|
normals_path = os.path.join(floorplan_folder_path, self.normals_map_file_name) |
|
normals_uint8 = (np.clip(normals_map, 0, 1) * 255).astype(np.uint8) |
|
cv2.imwrite(normals_path, normals_uint8) |
|
|
|
def export_annos(): |
|
anno_path = os.path.join(floorplan_folder_path, self.anno_file_name) |
|
with open(anno_path, 'w') as f: |
|
json.dump(annos, f) |
|
|
|
def export_vis(): |
|
vis_path = os.path.join(floorplan_folder_path, self.vis_file_name) |
|
vis = self.vis_scene_data(density_map, annos, show=False) |
|
if vis is not None: |
|
cv2.imwrite(vis_path, vis) |
|
else: |
|
print("Visualization is None. Skip exporting the visualization...") |
|
|
|
floorplan_folder_path = os.path.join(scene_path, self.out_folder) |
|
if not os.path.isdir(floorplan_folder_path): |
|
os.mkdir(floorplan_folder_path) |
|
|
|
export_density() |
|
export_normals() |
|
export_annos() |
|
export_vis() |
|
|
|
def normalize_annotations(self, scene_path, normalization_dict): |
|
annotation_path = os.path.join(scene_path, "annotation_3d.json") |
|
with open(annotation_path, "r") as f: |
|
annotation_json = json.load(f) |
|
|
|
for line in annotation_json["lines"]: |
|
point = line["point"] |
|
point = self.normalize_point(point, normalization_dict) |
|
line["point"] = point |
|
|
|
for junction in annotation_json["junctions"]: |
|
point = junction["coordinate"] |
|
point = self.normalize_point(point, normalization_dict) |
|
junction["coordinate"] = point |
|
|
|
normalization_dict["min_coords"] = normalization_dict["min_coords"].tolist() |
|
normalization_dict["max_coords"] = normalization_dict["max_coords"].tolist() |
|
normalization_dict["image_res"] = normalization_dict["image_res"].tolist() |
|
|
|
return annotation_json |
|
|
|
def normalize_point(self, point, normalization_dict): |
|
|
|
min_coords = normalization_dict["min_coords"] |
|
max_coords = normalization_dict["max_coords"] |
|
image_res = normalization_dict["image_res"] |
|
|
|
point_2d = \ |
|
np.round( |
|
(point[:2] - min_coords[:2]) / (max_coords[:2] - min_coords[:2]) * image_res) |
|
point_2d = np.minimum(np.maximum(point_2d, np.zeros_like(image_res)), |
|
image_res - 1) |
|
|
|
point[:2] = point_2d.tolist() |
|
|
|
return point |
|
|
|
def vis_scene_data(self, density_map, annos, show=True): |
|
polygons = visualize_floorplan(annos, vis=False, ret=True) |
|
|
|
if polygons is None: |
|
return None |
|
|
|
fig = plt.figure() |
|
gs = fig.add_gridspec(1, 2) |
|
|
|
ax0 = fig.add_subplot(gs[0, 0]) |
|
ax0.imshow(density_map) |
|
plt.axis('equal') |
|
plt.axis('off') |
|
|
|
ax1 = fig.add_subplot(gs[0, 1]) |
|
|
|
junctions = np.array([junc['coordinate'][:2] for junc in annos['junctions']]) |
|
for (polygon, poly_type) in polygons: |
|
polygon = Polygon(junctions[np.array(polygon)]) |
|
plot_coords(ax1, polygon.exterior, alpha=0.5) |
|
if poly_type == 'outwall': |
|
patch = PolygonPatch(polygon, facecolor=semantics_cmap[poly_type], alpha=0) |
|
else: |
|
patch = PolygonPatch(polygon, facecolor=semantics_cmap[poly_type], alpha=0.5) |
|
ax1.add_patch(patch) |
|
|
|
ax1.set_ylim(density_map.shape[0], 0) |
|
ax1.set_xlim(0, density_map.shape[1]) |
|
plt.axis('equal') |
|
plt.axis('off') |
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', bbox_inches='tight') |
|
buf.seek(0) |
|
|
|
if show: |
|
plt.show() |
|
plt.close() |
|
|
|
vis = PIL.Image.open(buf) |
|
vis = np.array(vis) |
|
return vis |
|
|