josedolot's picture
Upload hybridnets/dataset.py
549e090
import cv2
import numpy as np
# np.set_printoptions(threshold=np.inf)
import random
import torch
import torchvision.transforms as transforms
from pathlib import Path
from torch.utils.data import Dataset
from utils.utils import letterbox, augment_hsv, random_perspective
from tqdm.autonotebook import tqdm
import json
import albumentations as A
class BddDataset(Dataset):
def __init__(self, params, is_train, inputsize=640, transform=None):
"""
initial all the characteristic
Inputs:
-params: configuration parameters
-is_train(bool): whether train set or not
-transform: ToTensor and Normalize
Returns:
None
"""
self.single_cls = True # just detect vehicle
self.is_train = is_train
self.params = params
self.transform = transform
self.inputsize = inputsize
self.Tensor = transforms.ToTensor()
img_root = Path(params.dataset['dataroot'])
label_root = Path(params.dataset['labelroot'])
mask_root = Path(params.dataset['maskroot'])
lane_root = Path(params.dataset['laneroot'])
if is_train:
indicator = params.dataset['train_set']
else:
indicator = params.dataset['test_set']
self.img_root = img_root / indicator
self.label_root = label_root / indicator
self.mask_root = mask_root / indicator
self.lane_root = lane_root / indicator
# self.label_list = self.label_root.iterdir()
self.mask_list = self.mask_root.iterdir()
self.data_format = params.dataset['data_format']
self.scale_factor = params.dataset['scale_factor']
self.rotation_factor = params.dataset['rot_factor']
self.flip = params.dataset['flip']
self.color_rgb = params.dataset['color_rgb']
self.albumentations_transform = A.Compose([
A.Blur(p=0.01),
A.MedianBlur(p=0.01),
A.ToGray(p=0.01),
A.CLAHE(p=0.01),
A.RandomBrightnessContrast(p=0.01),
A.RandomGamma(p=0.01),
A.ImageCompression(quality_lower=75, p=0.01)],
bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels']),
additional_targets={'mask0': 'mask'})
# bdd_labels = {
# 'unlabeled':0, 'dynamic': 1, 'ego vehicle': 2, 'ground': 3,
# 'static': 4, 'parking': 5, 'rail track': 6, 'road': 7,
# 'sidewalk': 8, 'bridge': 9, 'building': 10, 'fence': 11,
# 'garage': 12, 'guard rail': 13, 'tunnel': 14, 'wall': 15,
# 'banner': 16, 'billboard': 17, 'lane divider': 18,'parking sign': 19,
# 'pole': 20, 'polegroup': 21, 'street light': 22, 'traffic cone': 23,
# 'traffic device': 24, 'traffic light': 25, 'traffic sign': 26, 'traffic sign frame': 27,
# 'terrain': 28, 'vegetation': 29, 'sky': 30, 'person': 31,
# 'rider': 32, 'bicycle': 33, 'bus': 34, 'car': 35,
# 'caravan': 36, 'motorcycle': 37, 'trailer': 38, 'train': 39,
# 'truck': 40
# }
self.id_dict = {'person': 0, 'rider': 1, 'car': 2, 'bus': 3, 'truck': 4,
'bike': 5, 'motor': 6, 'tl_green': 7, 'tl_red': 8,
'tl_yellow': 9, 'tl_none': 10, 'traffic sign': 11, 'train': 12}
self.id_dict_single = {'car': 0, 'bus': 1, 'truck': 2, 'train': 3}
# id_dict = {'car': 0, 'bus': 1, 'truck': 2}
self.shapes = np.array(params.dataset['org_img_size'])
self.db = self._get_db()
def _get_db(self):
"""
get database from the annotation file
Inputs:
Returns:
gt_db: (list)database [a,b,c,...]
a: (dictionary){'image':, 'information':, ......}
image: image path
mask: path of the segmetation label
label: [cls_id, center_x//256, center_y//256, w//256, h//256] 256=IMAGE_SIZE
"""
print('building database...')
gt_db = []
height, width = self.shapes
for mask in tqdm(list(self.mask_list)):
mask_path = str(mask)
label_path = mask_path.replace(str(self.mask_root), str(self.label_root)).replace(".png", ".json")
image_path = mask_path.replace(str(self.mask_root), str(self.img_root)).replace(".png", ".jpg")
lane_path = mask_path.replace(str(self.mask_root), str(self.lane_root))
with open(label_path, 'r') as f:
label = json.load(f)
data = label['frames'][0]['objects']
data = self.select_data(data)
gt = np.zeros((len(data), 5))
for idx, obj in enumerate(data):
category = obj['category']
if category == "traffic light":
color = obj['attributes']['trafficLightColor']
category = "tl_" + color
if category in self.id_dict.keys():
x1 = float(obj['box2d']['x1'])
y1 = float(obj['box2d']['y1'])
x2 = float(obj['box2d']['x2'])
y2 = float(obj['box2d']['y2'])
cls_id = self.id_dict[category]
if self.single_cls:
cls_id = 0
gt[idx][0] = cls_id
box = self.convert((width, height), (x1, x2, y1, y2))
gt[idx][1:] = list(box)
rec = [{
'image': image_path,
'label': gt,
'mask': mask_path,
'lane': lane_path
}]
# img = cv2.imread(image_path, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION | cv2.IMREAD_UNCHANGED)
# # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# for label in gt:
# # print(label[1])
# x1 = label[1] - label[3] / 2
# x1 *= 1280
# x1 = int(x1)
# # print(x1)
# x2 = label[1] + label[3] / 2
# x2 *= 1280
# x2 = int(x2)
# y1 = label[2] - label[4] / 2
# y1 *= 720
# y1 = int(y1)
# y2 = label[2] + label[4] / 2
# y2 *= 720
# y2 = int(y2)
# img = cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
# cv2.imwrite('gt/{}'.format(image_path.split('/')[-1]), img)
gt_db += rec
print('database build finish')
return gt_db
def evaluate(self, params, preds, output_dir):
"""
finished on children dataset
"""
raise NotImplementedError
def __len__(self, ):
"""
number of objects in the dataset
"""
return len(self.db)
def __getitem__(self, idx):
"""
Get input and groud-truth from database & add data augmentation on input
Inputs:
-idx: the index of image in self.db(database)(list)
self.db(list) [a,b,c,...]
a: (dictionary){'image':, 'information':}
Returns:
-image: transformed image, first passed the data augmentation in __getitem__ function(type:numpy), then apply self.transform
-target: ground truth(det_gt,seg_gt)
function maybe useful
cv2.imread
cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
cv2.warpAffine
"""
data = self.db[idx]
img = cv2.imread(data["image"], cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
if self.params.num_seg_class == 3:
seg_label = cv2.imread(data["mask"])
else:
seg_label = cv2.imread(data["mask"], 0)
lane_label = cv2.imread(data["lane"], 0)
# print(lane_label.shape)
# print(seg_label.shape)
# print(lane_label.shape)
# print(seg_label.shape)
resized_shape = self.inputsize
if isinstance(resized_shape, list):
resized_shape = max(resized_shape)
h0, w0 = img.shape[:2] # orig hw
r = resized_shape / max(h0, w0) # resize image to img_size
if r != 1: # always resize down, only resize up if training with augmentation
interp = cv2.INTER_AREA if r < 1 else cv2.INTER_LINEAR
img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=interp)
seg_label = cv2.resize(seg_label, (int(w0 * r), int(h0 * r)), interpolation=interp)
lane_label = cv2.resize(lane_label, (int(w0 * r), int(h0 * r)), interpolation=interp)
h, w = img.shape[:2]
(img, seg_label, lane_label), ratio, pad = letterbox((img, seg_label, lane_label), resized_shape, auto=True,
scaleup=self.is_train)
shapes = (h0, w0), ((h / h0, w / w0), pad) # for COCO mAP rescaling
# ratio = (w / w0, h / h0)
# print(resized_shape)
det_label = data["label"]
# print(det_label)
labels = []
labels_app = np.array([])
if det_label.size > 0:
# Normalized xywh to pixel xyxy format
labels = det_label.copy()
labels[:, 1] = ratio[0] * w * (det_label[:, 1] - det_label[:, 3] / 2) + pad[0] # pad width
labels[:, 2] = ratio[1] * h * (det_label[:, 2] - det_label[:, 4] / 2) + pad[1] # pad height
labels[:, 3] = ratio[0] * w * (det_label[:, 1] + det_label[:, 3] / 2) + pad[0]
labels[:, 4] = ratio[1] * h * (det_label[:, 2] + det_label[:, 4] / 2) + pad[1]
# print(labels[:, 1:4])
if self.is_train:
# albumentations
try:
new = self.albumentations_transform(image=img, mask=seg_label, mask0=lane_label,
bboxes=labels[:, 1:] if len(labels) else labels,
class_labels=labels[:, 0] if len(labels) else labels)
img = new['image']
labels = np.array([[c, *b] for c, b in zip(new['class_labels'], new['bboxes'])]) if len(labels) else labels
seg_label = new['mask']
lane_label = new['mask0']
except ValueError: # bbox have width or height == 0
pass
# augmentation
combination = (img, seg_label, lane_label)
(img, seg_label, lane_label), labels = random_perspective(
combination=combination,
targets=labels,
degrees=self.params.dataset['rot_factor'],
translate=self.params.dataset['translate'],
scale=self.params.dataset['scale_factor'],
shear=self.params.dataset['shear']
)
# print(labels.shape)
augment_hsv(img, hgain=self.params.dataset['hsv_h'], sgain=self.params.dataset['hsv_s'], vgain=self.params.dataset['hsv_v'])
# img, seg_label, labels = cutout(combination=combination, labels=labels)
# random left-right flip
lr_flip = True
if lr_flip and random.random() < 0.5:
img = img[:, ::-1, :]
if len(labels):
rows, cols, channels = img.shape
x1 = labels[:, 1].copy()
x2 = labels[:, 3].copy()
x_tmp = x1.copy()
labels[:, 1] = cols - x2
labels[:, 3] = cols - x_tmp
# Segmentation
seg_label = np.fliplr(seg_label)
lane_label = np.fliplr(lane_label)
# cv2.imwrite('img0.jpg',img)
# cv2.imwrite('img1.jpg',seg_label)
# cv2.imwrite('img2.jpg',lane_label)
# exit()
# print(labels)
# random up-down flip
ud_flip = False
if ud_flip and random.random() < 0.5:
img = np.flipud(img)
seg_label = np.filpud(seg_label)
lane_label = np.filpud(lane_label)
if len(labels):
rows, cols, channels = img.shape
y1 = labels[:, 2].copy()
y2 = labels[:, 4].copy()
y_tmp = y1.copy()
labels[:, 2] = rows - y2
labels[:, 4] = rows - y_tmp
# for anno in labels:
# x1, y1, x2, y2 = [int(x) for x in anno[1:5]]
# print(x1,y1,x2,y2)
# cv2.rectangle(img, (x1,y1), (x2,y2), (0,0,255), 3)
# cv2.imwrite(data["image"].split("/")[-1], img)
if len(labels):
labels_app = np.zeros((len(labels), 5))
labels_app[:, 0:4] = labels[:, 1:5]
labels_app[:, 4] = labels[:, 0]
img = np.ascontiguousarray(img)
_, seg1 = cv2.threshold(seg_label, 1, 255, cv2.THRESH_BINARY)
_, lane1 = cv2.threshold(lane_label, 1, 255, cv2.THRESH_BINARY)
# prefer lane
seg1 = seg1 - (seg1 & lane1)
union = seg1 | lane1
# print(union.shape)
background = 255 - union
# print(img.shape)
# print(lane1.shape)
# img_copy = img.copy()
# img_copy[lane1 == 255] = (0, 255, 0)
# cv2.imwrite('seg_gt/' + data['image'].split('/')[-1], img_copy)
# cv2.imwrite('background.jpg', background)
# cv2.imwrite('{}.jpg'.format(data['image'].split('/')[-1]), img)
# cv2.imwrite('{}-lane.jpg'.format(data['image'].split('/')[-1]),lane1)
# cv2.imwrite('{}-seg.jpg'.format(data['image'].split('/')[-1]),seg1)
seg1 = self.Tensor(seg1)
lane1 = self.Tensor(lane1)
background = self.Tensor(background)
segmentation = torch.cat([background, seg1, lane1], dim=0)
# print(segmentation.size())
# print(seg1.shape)
# for anno in labels_app:
# x1, y1, x2, y2 = [int(x) for x in anno[anno != -1][:4]]
# cv2.rectangle(img, (x1,y1), (x2,y2), (0,0,255), 1)
# cv2.imwrite(data["image"].split("/")[-1], img)
img = self.transform(img)
return img, data["image"], shapes, torch.from_numpy(labels_app), segmentation
def select_data(self, db):
"""
You can use this function to filter useless images in the dataset
Inputs:
-db: (list)database
Returns:
-db_selected: (list)filtered dataset
"""
remain = []
for obj in db:
if 'box2d' in obj.keys(): # obj.has_key('box2d'):
if self.single_cls:
if obj['category'] in self.id_dict_single.keys():
remain.append(obj)
else:
remain.append(obj)
return remain
def convert(self, size, box):
dw = 1. / (size[0])
dh = 1. / (size[1])
x = (box[0] + box[1]) / 2.0
y = (box[2] + box[3]) / 2.0
w = box[1] - box[0]
h = box[3] - box[2]
x = x * dw
w = w * dw
y = y * dh
h = h * dh
return x, y, w, h
@staticmethod
def collate_fn(batch):
img, paths, shapes, labels_app, segmentation = zip(*batch)
filenames = [file.split('/')[-1] for file in paths]
# print(len(labels_app))
max_num_annots = max(label.size(0) for label in labels_app)
if max_num_annots > 0:
annot_padded = torch.ones((len(labels_app), max_num_annots, 5)) * -1
for idx, label in enumerate(labels_app):
if label.size(0) > 0:
annot_padded[idx, :label.size(0), :] = label
else:
annot_padded = torch.ones((len(labels_app), 1, 5)) * -1
# print("ABC", seg1.size())
return {'img': torch.stack(img, 0), 'annot': annot_padded, 'segmentation': torch.stack(segmentation, 0),
'filenames': filenames, 'shapes': shapes}