import torch import numpy as np import math import base64 import collections import pycocotools.mask as mask_utils from maskrcnn_benchmark.structures.bounding_box import BoxList from maskrcnn_benchmark.structures.segmentation_mask import SegmentationMask class LabelLoader(object): def __init__( self, labelmap, extra_fields=(), filter_duplicate_relations=False, ignore_attr=None, ignore_rel=None, mask_mode="poly", ): self.labelmap = labelmap self.extra_fields = extra_fields self.supported_fields = ["class", "conf", "attributes", "scores_all", "boxes_all", "feature", "mask"] self.filter_duplicate_relations = filter_duplicate_relations self.ignore_attr = set(ignore_attr) if ignore_attr != None else set() self.ignore_rel = set(ignore_rel) if ignore_rel != None else set() assert mask_mode == "poly" or mask_mode == "mask" self.mask_mode = mask_mode def __call__(self, annotations, img_size, remove_empty=False, load_fields=None): boxes = [obj["rect"] for obj in annotations] boxes = torch.as_tensor(boxes).reshape(-1, 4) target = BoxList(boxes, img_size, mode="xyxy") if load_fields is None: load_fields = self.extra_fields for field in load_fields: assert field in self.supported_fields, "Unsupported field {}".format(field) if field == "class": classes = self.add_classes(annotations) target.add_field("labels", classes) elif field == "conf": confidences = self.add_confidences(annotations) target.add_field("scores", confidences) elif field == "attributes": attributes = self.add_attributes(annotations) target.add_field("attributes", attributes) elif field == "scores_all": scores_all = self.add_scores_all(annotations) target.add_field("scores_all", scores_all) elif field == "boxes_all": boxes_all = self.add_boxes_all(annotations) target.add_field("boxes_all", boxes_all) elif field == "feature": features = self.add_features(annotations) target.add_field("box_features", features) elif field == "mask": masks, is_box_mask = self.add_masks(annotations, img_size) target.add_field("masks", masks) target.add_field("is_box_mask", is_box_mask) target = target.clip_to_image(remove_empty=remove_empty) return target def get_box_mask(self, rect, img_size): x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3] if self.mask_mode == "poly": return [[x1, y1, x1, y2, x2, y2, x2, y1]] elif self.mask_mode == "mask": # note the order of height/width order in mask is opposite to image mask = np.zeros([img_size[1], img_size[0]], dtype=np.uint8) mask[math.floor(y1) : math.ceil(y2), math.floor(x1) : math.ceil(x2)] = 255 encoded_mask = mask_utils.encode(np.asfortranarray(mask)) encoded_mask["counts"] = encoded_mask["counts"].decode("utf-8") return encoded_mask def add_masks(self, annotations, img_size): masks = [] is_box_mask = [] for obj in annotations: if "mask" in obj: masks.append(obj["mask"]) is_box_mask.append(0) else: masks.append(self.get_box_mask(obj["rect"], img_size)) is_box_mask.append(1) masks = SegmentationMask(masks, img_size, mode=self.mask_mode) is_box_mask = torch.tensor(is_box_mask) return masks, is_box_mask def add_classes(self, annotations): class_names = [obj["class"] for obj in annotations] classes = [None] * len(class_names) for i in range(len(class_names)): classes[i] = self.labelmap["class_to_ind"][class_names[i]] return torch.tensor(classes) def add_confidences(self, annotations): confidences = [] for obj in annotations: if "conf" in obj: confidences.append(obj["conf"]) else: confidences.append(1.0) return torch.tensor(confidences) def add_attributes(self, annotations): # the maximal number of attributes per object is 16 attributes = [[0] * 16 for _ in range(len(annotations))] for i, obj in enumerate(annotations): for j, attr in enumerate(obj["attributes"]): attributes[i][j] = self.labelmap["attribute_to_ind"][attr] return torch.tensor(attributes) def add_features(self, annotations): features = [] for obj in annotations: features.append(np.frombuffer(base64.b64decode(obj["feature"]), np.float32)) return torch.tensor(features) def add_scores_all(self, annotations): scores_all = [] for obj in annotations: scores_all.append(np.frombuffer(base64.b64decode(obj["scores_all"]), np.float32)) return torch.tensor(scores_all) def add_boxes_all(self, annotations): boxes_all = [] for obj in annotations: boxes_all.append(np.frombuffer(base64.b64decode(obj["boxes_all"]), np.float32).reshape(-1, 4)) return torch.tensor(boxes_all) def relation_loader(self, relation_annos, target): if self.filter_duplicate_relations: # Filter out dupes! all_rel_sets = collections.defaultdict(list) for triplet in relation_annos: all_rel_sets[(triplet["subj_id"], triplet["obj_id"])].append(triplet) relation_annos = [np.random.choice(v) for v in all_rel_sets.values()] # get M*M pred_labels relation_triplets = [] relations = torch.zeros([len(target), len(target)], dtype=torch.int64) for i in range(len(relation_annos)): if len(self.ignore_rel) != 0 and relation_annos[i]["class"] in self.ignore_rel: continue subj_id = relation_annos[i]["subj_id"] obj_id = relation_annos[i]["obj_id"] predicate = self.labelmap["relation_to_ind"][relation_annos[i]["class"]] relations[subj_id, obj_id] = predicate relation_triplets.append([subj_id, obj_id, predicate]) relation_triplets = torch.tensor(relation_triplets) target.add_field("relation_labels", relation_triplets) target.add_field("pred_labels", relations) return target class BoxLabelLoader(object): def __init__(self, labelmap, extra_fields=(), ignore_attrs=(), mask_mode="poly"): self.labelmap = labelmap self.extra_fields = extra_fields self.ignore_attrs = ignore_attrs assert mask_mode == "poly" or mask_mode == "mask" self.mask_mode = mask_mode self.all_fields = ["class", "mask", "confidence", "attributes_encode", "IsGroupOf", "IsProposal"] def __call__(self, annotations, img_size, remove_empty=True): boxes = [obj["rect"] for obj in annotations] boxes = torch.as_tensor(boxes).reshape(-1, 4) target = BoxList(boxes, img_size, mode="xyxy") for field in self.extra_fields: assert field in self.all_fields, "Unsupported field {}".format(field) if field == "class": classes = self.add_classes_with_ignore(annotations) target.add_field("labels", classes) elif field == "mask": masks, is_box_mask = self.add_masks(annotations, img_size) target.add_field("masks", masks) target.add_field("is_box_mask", is_box_mask) elif field == "confidence": confidences = self.add_confidences(annotations) target.add_field("confidences", confidences) elif field == "attributes_encode": attributes = self.add_attributes(annotations) target.add_field("attributes", attributes) elif field == "IsGroupOf": is_group = [1 if "IsGroupOf" in obj and obj["IsGroupOf"] == 1 else 0 for obj in annotations] target.add_field("IsGroupOf", torch.tensor(is_group)) elif field == "IsProposal": is_proposal = [1 if "IsProposal" in obj and obj["IsProposal"] == 1 else 0 for obj in annotations] target.add_field("IsProposal", torch.tensor(is_proposal)) target = target.clip_to_image(remove_empty=remove_empty) return target def add_classes_with_ignore(self, annotations): class_names = [obj["class"] for obj in annotations] classes = [None] * len(class_names) if self.ignore_attrs: for i, obj in enumerate(annotations): if any([obj[attr] for attr in self.ignore_attrs if attr in obj]): classes[i] = -1 for i, cls in enumerate(classes): if cls != -1: classes[i] = self.labelmap[class_names[i]] + 1 # 0 is saved for background return torch.tensor(classes) def add_masks(self, annotations, img_size): masks = [] is_box_mask = [] for obj in annotations: if "mask" in obj: masks.append(obj["mask"]) is_box_mask.append(0) else: masks.append(self.get_box_mask(obj["rect"], img_size)) is_box_mask.append(1) masks = SegmentationMask(masks, img_size, mode=self.mask_mode) is_box_mask = torch.tensor(is_box_mask) return masks, is_box_mask def get_box_mask(self, rect, img_size): x1, y1, x2, y2 = rect[0], rect[1], rect[2], rect[3] if self.mask_mode == "poly": return [[x1, y1, x1, y2, x2, y2, x2, y1]] elif self.mask_mode == "mask": # note the order of height/width order in mask is opposite to image mask = np.zeros([img_size[1], img_size[0]], dtype=np.uint8) mask[math.floor(y1) : math.ceil(y2), math.floor(x1) : math.ceil(x2)] = 255 encoded_mask = mask_utils.encode(np.asfortranarray(mask)) encoded_mask["counts"] = encoded_mask["counts"].decode("utf-8") return encoded_mask def add_confidences(self, annotations): confidences = [] for obj in annotations: if "confidence" in obj: confidences.append(obj["confidence"]) elif "conf" in obj: confidences.append(obj["conf"]) else: confidences.append(1.0) return torch.tensor(confidences) def add_attributes(self, annotations): # we know that the maximal number of attributes per object is 16 attributes = [[0] * 16 for _ in range(len(annotations))] for i, obj in enumerate(annotations): attributes[i][: len(obj["attributes_encode"])] = obj["attributes_encode"] return torch.tensor(attributes)