zdou0830's picture
desco
749745d
raw
history blame
23 kB
# Utilities for converting object detection data into grounding data
import numpy as np
import torch
import pdb, json, random, re
from maskrcnn_benchmark.structures.bounding_box import BoxList
from maskrcnn_benchmark.data.datasets.tsv import load_from_yaml_file
from collections import defaultdict
from tqdm import tqdm
from maskrcnn_benchmark.data.datasets.parse_gpt import GPTOutputParser
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
all_ = []
for i in range(0, len(lst), n):
data_index = lst[i:i + n]
all_.append(data_index)
counter = 0
for i in all_:
counter += len(i)
assert(counter == len(lst))
return all_
def clean_name(name):
def _clean_name(name):
name = re.sub(r"\(.*\)", "", name)
name = re.sub(r"_", " ", name)
name = re.sub(r" ", " ", name)
return name
if ":" in name:
obj_name, part_name = name.split(":")
obj_name = _clean_name(obj_name)
part_name = _clean_name(part_name)
return part_name + " of " + obj_name
else:
return _clean_name(name)
def clean_string(input_string):
# remove leading and trailing spaces
input_string = input_string.strip()
# remove trailing ";" and "."
input_string = re.sub(r";$", "", input_string)
input_string = re.sub(r"\.$", "", input_string)
return input_string
class DetectionToGrounding():
'''
Convert detection data into grounding data;
Construct prompts for training and inference;
'''
def __init__(self, version):
pass
def sanity_check_target_after_processing(target):
assert(len(target.bbox) == len(target.extra_fields["boxes"]))
def convert_od_to_grounding_simple(
target,
image_id,
ind_to_class,
disable_shuffle=True,
add_detection_prompt=False,
separation_tokens=" ",
caption_prompt=None):
"""
Convert object detection data into grounding data format, on the fly.
ind_to_class: {0: "__background__", 1 : "person" ...}, contiguous id
"""
def generate_sentence_from_labels(positive_label_list, negative_label_list, disable_shuffle=True):
label_to_positions = {}
label_list = negative_label_list + positive_label_list
if not disable_shuffle:
random.shuffle(label_list)
assert (caption_prompt is None), "Should not specify caption_prompt when shuffle is enabled!!" # avoid potential bug
if add_detection_prompt:
pheso_caption = "object detection : "
else:
pheso_caption = ""
for index, label in enumerate(label_list):
if caption_prompt is not None:
pheso_caption += caption_prompt[index]['prefix']
start_index = len(pheso_caption)
if caption_prompt is not None:
pheso_caption += clean_name(caption_prompt[index]['name'])
else:
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change...
end_index = len(pheso_caption)
if caption_prompt is not None:
pheso_caption += caption_prompt[index]['suffix']
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17
# label_to_positions: {4: (0, 3), 17: (4, 7)}
label_to_positions[label] = [start_index, end_index]
if index != len(label_list) - 1:
pheso_caption += separation_tokens
return label_to_positions, pheso_caption
label_list = list(sorted(ind_to_class.keys())) # do not include the background
label_to_positions, pheso_caption = generate_sentence_from_labels(
positive_label_list=label_list,
negative_label_list=[],
disable_shuffle=disable_shuffle
)
new_target = []
'''
Convert into:
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]}
tokens_positive is the char position
'''
areas = target.area()
greenlight_span_for_masked_lm_objective = []
for i in range(len(target)):
new_target_i = {}
new_target_i["area"] = areas[i]
new_target_i["iscrowd"] = 0
new_target_i["image_id"] = image_id
new_target_i["category_id"] = target.extra_fields["labels"][i].item()
new_target_i["id"] = None
new_target_i['bbox'] = target.bbox[i].numpy().tolist()
label_i = target.extra_fields["labels"][i].item()
if label_i in label_to_positions: # NOTE: Only add those that actually appear in the final caption
new_target_i["tokens_positive"] = [label_to_positions[label_i]]
new_target.append(new_target_i)
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i])
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective
def check_for_positive_overflow(target, ind_to_class, tokenizer, max_seq_length=256):
# NOTE: Only call this function for OD data; DO NOT USE IT FOR GROUNDING DATA
# NOTE: called only in coco_dt
# Check if we have too many positive labels
# generate a caption by appending the positive labels
positive_label_set = set()
for i in range(len(target)):
label_i = target.extra_fields["labels"][i].item()
positive_label_set.add(label_i)
positive_label_list = list(positive_label_set)
# random shuffule so we can sample different annotations at different epochs
random.shuffle(positive_label_list)
kept_lables = []
length = 0
for index, label in enumerate(positive_label_list):
label_text = clean_name(ind_to_class[label]) + ". " # "dog. "
tokenized = tokenizer.tokenize(label_text)
length += len(tokenized)
if length > max_seq_length:
break
else:
kept_lables.append(label)
## filter boxes
keep_box_index = []
for i in range(len(target)):
label_i = target.extra_fields["labels"][i].item()
if label_i in kept_lables:
keep_box_index.append(i)
keep_box_index = torch.LongTensor(keep_box_index)
target = target[keep_box_index] ## filter boxes
return target, length
def _label_drop_with_length_limit(label_list, ind_to_class, length_limit, tokenizer):
screened_label_list = []
random.shuffle(label_list) # randomly drop labels
for label in label_list:
label_text = clean_name(ind_to_class[label]) + ". " # "dog. "
tokenized = tokenizer.tokenize(label_text)
length_limit -= len(tokenized)
if length_limit > 0:
screened_label_list.append(label) # keep this label
else:
break
return screened_label_list
def _randomv1_od_to_grounding(all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer):
label_num = np.random.randint(1, max_num_labels)
selected_label_list = np.random.choice(all_labels, label_num, replace=False)
screened_label_list = _label_drop_with_length_limit(selected_label_list, ind_to_class, max_seq_length, tokenizer)
return screened_label_list
def _randomv2_od_to_grounding(all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer, positive_label_set):
full_positive = len(positive_label_set)
full_negative = max_num_labels - full_positive
outer_prob = random.random()
if outer_prob < 0.8:
num_negatives = full_negative
num_positives = full_positive
elif outer_prob < 0.9:
num_negatives = np.random.choice(max(1, full_negative)) + 1 # mininum 1
num_positives = full_positive
else:
num_positives = np.random.choice(max(1, full_positive)) + 1 # mininum 1
num_negatives = full_negative
# Keep some negatives
negative_label_list = [label for label in all_labels if label not in positive_label_set]
random.shuffle(negative_label_list)
negative_label_list = negative_label_list[:num_negatives]
# Keep some positives
positive_label_list = list(positive_label_set)
random.shuffle(positive_label_list)
positive_label_list = positive_label_list[:num_positives]
selected_label_list = positive_label_list + negative_label_list
screened_label_list = _label_drop_with_length_limit(selected_label_list, ind_to_class, max_seq_length, tokenizer)
return screened_label_list
def od_to_grounding_optimized_streamlined(
target,
image_id,
ind_to_class,
tokenizer,
od_to_grounding_version,
):
if od_to_grounding_version == "random.v1":
separation_tokens = ". "
max_num_labels = 85
max_seq_length = 254
elif od_to_grounding_version == "random.v2":
separation_tokens = ". "
max_num_labels = 60
max_seq_length = 254
def generate_senetence_given_labels(
label_list,
disable_shuffle=False,
):
label_to_positions = {}
if not disable_shuffle:
random.shuffle(label_list)
pheso_caption = ""
for index, label in enumerate(label_list):
start_index = len(pheso_caption)
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change...
end_index = len(pheso_caption)
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17
# label_to_positions: {4: (0, 3), 17: (4, 7)}
label_to_positions[label] = [start_index, end_index]
if index != len(label_list) - 1:
pheso_caption += separation_tokens
return label_to_positions, pheso_caption
if od_to_grounding_version == "random.v1":
# all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer
screened_label_list = _randomv1_od_to_grounding(
all_labels = list(ind_to_class.keys()),
ind_to_class = ind_to_class,
max_seq_length = max_seq_length,
max_num_labels = max_num_labels,
tokenizer = tokenizer,
)
label_to_positions, pheso_caption = generate_senetence_given_labels(
label_list=screened_label_list, )
elif od_to_grounding_version == "random.v2":
screened_label_list = _randomv2_od_to_grounding(
all_labels = list(ind_to_class.keys()),
ind_to_class = ind_to_class,
max_seq_length = max_seq_length,
max_num_labels = max_num_labels,
tokenizer = tokenizer,
positive_label_set = set(target.extra_fields["labels"].tolist()),
)
label_to_positions, pheso_caption = generate_senetence_given_labels(
label_list=screened_label_list, )
else:
raise NotImplementedError
new_target = []
'''
Convert into:
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]}
tokens_positive is the char position
'''
areas = target.area()
greenlight_span_for_masked_lm_objective = []
for i in range(len(target)):
new_target_i = {}
new_target_i["area"] = areas[i]
new_target_i["iscrowd"] = 0
new_target_i["image_id"] = image_id
new_target_i["category_id"] = target.extra_fields["labels"][i].item()
new_target_i["id"] = None
new_target_i['bbox'] = target.bbox[i].numpy().tolist()
label_i = target.extra_fields["labels"][i].item()
new_target_i["original_od_label"] = label_i
if label_i in label_to_positions: # NOTE: Only add labels that actually appear in the final caption
new_target_i["tokens_positive"] = [label_to_positions[label_i]]
new_target.append(new_target_i)
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i])
# reconstruct the target
new_target_boxlist = BoxList(torch.as_tensor([i['bbox'] for i in new_target]).reshape(-1, 4), target.size, mode="xyxy")
new_target_boxlist.add_field("labels", torch.as_tensor([i['category_id'] for i in new_target]))
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective, label_to_positions, new_target_boxlist
def convert_object_detection_to_grounding_optimized_for_od(
target,
image_id,
ind_to_class,
disable_shuffle,
add_detection_prompt,
add_detection_prompt_advanced,
random_sample_negative,
control_probabilities,
restricted_negative_list=None,
separation_tokens=" ",
max_num_labels=-1,
max_seq_length=256,
tokenizer=None,
positive_caption_length=0,
od_to_grounding_version = "vanilla",
):
'''
ind_to_class: {0: "__background__", 1 : "person" ...}
target:
restricted_negative_list : for datasets with restricted negatives, sample only the negatives
Convert object detection data into grounding data format, on the fly.
Control options:
1. add_detection_prompt: add "object detection : " to the front of the prompt
2. num_negatives: randomly sampled negative classes
3. num_positives: how many positives to keep (-1 means do not cut any)
Probabilities to generate the control options:
a. probability_one_negative: only give one negative class to mimic evaluation
b. probability_one_positive: only give one positive class to mimic evaluation
c. probability_full: add both all positive and all negatives
d. other:
randomly sample some negatives and some positives
The below control options are independent of each other:
- probability_random_negative: probability of randomly sample X negatives
- probability_random_positive: probability of randomly sample some positives
NEW: control version; we will have a few pre-defined control versions; and we only need to sepecify the version instead of all the detailed paratmeters
'''
def generate_senetence_given_labels(
positive_label_list,
negative_label_list,
prompt_engineer_version="v2",
disable_shuffle=False,
positive_question_probability=0.6,
negative_question_probability=0.8,
full_question_probability=0.5):
'''
v3: with simple prompt such as "there are", "are there?"
v4: try to merge some are there / there are together, to avoid sequence being too long
'''
label_to_positions = {}
assert (prompt_engineer_version == "v2")
num_negatives = len(negative_label_list)
num_positives = len(positive_label_list)
label_list = negative_label_list + positive_label_list
if not disable_shuffle:
random.shuffle(label_list)
if add_detection_prompt:
if add_detection_prompt_advanced and (num_negatives == 0 or num_positives == 0) and not disable_shuffle:
pheso_caption = "object detection query : "
else:
pheso_caption = "object detection : "
else:
pheso_caption = ""
for index, label in enumerate(label_list):
start_index = len(pheso_caption)
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change...
end_index = len(pheso_caption)
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17
# label_to_positions: {4: (0, 3), 17: (4, 7)}
label_to_positions[label] = [start_index, end_index]
if index != len(label_list) - 1:
pheso_caption += separation_tokens
return label_to_positions, pheso_caption
positive_label_set = set()
for i in range(len(target)):
label_i = target.extra_fields["labels"][i].item()
positive_label_set.add(label_i)
if restricted_negative_list is None:
valid_negative_indexes = list(ind_to_class.keys())
else:
valid_negative_indexes = restricted_negative_list
all_vailable_labels = positive_label_set | set(valid_negative_indexes)
if disable_shuffle:
label_list = list(sorted(ind_to_class.keys()))[1:] # do not include the background
label_to_positions, pheso_caption = generate_senetence_given_labels(
positive_label_list=label_list,
negative_label_list=[],
disable_shuffle=True)
elif od_to_grounding_version == "random":
# all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer
screened_label_list = _random_od_to_grounding(
all_labels = all_vailable_labels,
ind_to_class = ind_to_class,
max_seq_length = max_seq_length,
max_num_labels = max_num_labels,
tokenizer = tokenizer,
)
label_to_positions, pheso_caption = generate_senetence_given_labels(
positive_label_list=screened_label_list)
else:
full_positive = len(positive_label_set)
if max_num_labels <= 0:
full_negative = random_sample_negative
else:
full_negative = max(min(max_num_labels-full_positive, random_sample_negative), 0)
if full_negative > len(valid_negative_indexes):
full_negative = len(valid_negative_indexes)
num_negatives, num_positives = generate_control_options_given_probabilities(
control_probabilities=control_probabilities,
full_positive=full_positive,
full_negative=full_negative)
# num_positives not used
# Keep some negatives
negative_label_list = set()
if num_negatives != -1:
if num_negatives > len(valid_negative_indexes):
num_negatives = len(valid_negative_indexes)
for i in np.random.choice(valid_negative_indexes, size=num_negatives, replace=False):
# label_sets.add(i)
if i not in positive_label_set:
negative_label_list.add(i)
# Keep all positives; ignoring num_positives
positive_label_list = list(positive_label_set)
random.shuffle(positive_label_list)
negative_label_list = list(negative_label_list) # e.g.: [17, 1, 13] where each number is the class name
random.shuffle(negative_label_list)
# Do a pre-screen. If we cannot afford this many negatives, we will sample less
negative_max_length = max_seq_length - positive_caption_length
screened_negative_label_list = []
for negative_label in negative_label_list:
label_text = clean_name(ind_to_class[negative_label]) + ". " # "dog. "
tokenized = tokenizer.tokenize(label_text)
negative_max_length -= len(tokenized)
if negative_max_length > 0:
screened_negative_label_list.append(negative_label) # keep this negative
else:
break
negative_label_list = screened_negative_label_list
label_to_positions, pheso_caption = generate_senetence_given_labels(
positive_label_list=positive_label_list,
negative_label_list=negative_label_list)
new_target = []
'''
Convert into:
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]}
tokens_positive is the char position
'''
areas = target.area()
greenlight_span_for_masked_lm_objective = []
for i in range(len(target)):
new_target_i = {}
new_target_i["area"] = areas[i]
new_target_i["iscrowd"] = 0
new_target_i["image_id"] = image_id
new_target_i["category_id"] = target.extra_fields["labels"][i].item()
new_target_i["id"] = None
new_target_i['bbox'] = target.bbox[i].numpy().tolist()
label_i = target.extra_fields["labels"][i].item()
new_target_i["original_od_label"] = label_i
if label_i in label_to_positions: # NOTE: Only add those that actually appear in the final caption
new_target_i["tokens_positive"] = [label_to_positions[label_i]]
new_target.append(new_target_i)
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i])
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective, label_to_positions
def generate_control_options_given_probabilities(
control_probabilities,
full_positive,
full_negative):
# The function was originally designed to perform data augmentation by randomly dropping negative and positive classes. Later, we decided to only consider dropping negative classes. So the returned 'num_positives' by this function will be ignored.
outer_prob = random.random()
probability_one_negative = control_probabilities[0]
probability_one_positive = control_probabilities[1]
probability_full = control_probabilities[2]
probability_drop_positive = control_probabilities[3]
assert(probability_drop_positive == 0)
if outer_prob < probability_one_negative:
# a. probability_one_negative: only give one negative class to mimic evaluation (10%)
num_negatives = 1
num_positives = 0
elif outer_prob < probability_one_positive + probability_one_negative:
# b. probability_one_positive: only give one positive class to mimic evaluation (10%)
num_negatives = 0
num_positives = 1
elif outer_prob < probability_full + probability_one_positive + probability_one_negative:
# c. probability_full: add both all positive and all negatives (20%)
num_negatives = full_negative
num_positives = full_positive
else:
if random.random() < 1.0: # - probability_random_negative: probability of randomly sample X negatives (100%)
num_negatives = np.random.choice(max(1, full_negative)) + 1 # mininum 1
else:
num_negatives = full_negative # Full
if random.random() < probability_drop_positive: #
num_positives = np.random.choice(max(1, full_positive)) + 1
else:
num_positives = full_positive # Full
return num_negatives, num_positives