Spaces:
Sleeping
Sleeping
# Utilities for converting object detection data into grounding data | |
import numpy as np | |
import torch | |
import pdb, json, random, re | |
from maskrcnn_benchmark.structures.bounding_box import BoxList | |
from maskrcnn_benchmark.data.datasets.tsv import load_from_yaml_file | |
from collections import defaultdict | |
from tqdm import tqdm | |
from maskrcnn_benchmark.data.datasets.parse_gpt import GPTOutputParser | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
all_ = [] | |
for i in range(0, len(lst), n): | |
data_index = lst[i:i + n] | |
all_.append(data_index) | |
counter = 0 | |
for i in all_: | |
counter += len(i) | |
assert(counter == len(lst)) | |
return all_ | |
def clean_name(name): | |
def _clean_name(name): | |
name = re.sub(r"\(.*\)", "", name) | |
name = re.sub(r"_", " ", name) | |
name = re.sub(r" ", " ", name) | |
return name | |
if ":" in name: | |
obj_name, part_name = name.split(":") | |
obj_name = _clean_name(obj_name) | |
part_name = _clean_name(part_name) | |
return part_name + " of " + obj_name | |
else: | |
return _clean_name(name) | |
def clean_string(input_string): | |
# remove leading and trailing spaces | |
input_string = input_string.strip() | |
# remove trailing ";" and "." | |
input_string = re.sub(r";$", "", input_string) | |
input_string = re.sub(r"\.$", "", input_string) | |
return input_string | |
class DetectionToGrounding(): | |
''' | |
Convert detection data into grounding data; | |
Construct prompts for training and inference; | |
''' | |
def __init__(self, version): | |
pass | |
def sanity_check_target_after_processing(target): | |
assert(len(target.bbox) == len(target.extra_fields["boxes"])) | |
def convert_od_to_grounding_simple( | |
target, | |
image_id, | |
ind_to_class, | |
disable_shuffle=True, | |
add_detection_prompt=False, | |
separation_tokens=" ", | |
caption_prompt=None): | |
""" | |
Convert object detection data into grounding data format, on the fly. | |
ind_to_class: {0: "__background__", 1 : "person" ...}, contiguous id | |
""" | |
def generate_sentence_from_labels(positive_label_list, negative_label_list, disable_shuffle=True): | |
label_to_positions = {} | |
label_list = negative_label_list + positive_label_list | |
if not disable_shuffle: | |
random.shuffle(label_list) | |
assert (caption_prompt is None), "Should not specify caption_prompt when shuffle is enabled!!" # avoid potential bug | |
if add_detection_prompt: | |
pheso_caption = "object detection : " | |
else: | |
pheso_caption = "" | |
for index, label in enumerate(label_list): | |
if caption_prompt is not None: | |
pheso_caption += caption_prompt[index]['prefix'] | |
start_index = len(pheso_caption) | |
if caption_prompt is not None: | |
pheso_caption += clean_name(caption_prompt[index]['name']) | |
else: | |
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change... | |
end_index = len(pheso_caption) | |
if caption_prompt is not None: | |
pheso_caption += caption_prompt[index]['suffix'] | |
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17 | |
# label_to_positions: {4: (0, 3), 17: (4, 7)} | |
label_to_positions[label] = [start_index, end_index] | |
if index != len(label_list) - 1: | |
pheso_caption += separation_tokens | |
return label_to_positions, pheso_caption | |
label_list = list(sorted(ind_to_class.keys())) # do not include the background | |
label_to_positions, pheso_caption = generate_sentence_from_labels( | |
positive_label_list=label_list, | |
negative_label_list=[], | |
disable_shuffle=disable_shuffle | |
) | |
new_target = [] | |
''' | |
Convert into: | |
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]} | |
tokens_positive is the char position | |
''' | |
areas = target.area() | |
greenlight_span_for_masked_lm_objective = [] | |
for i in range(len(target)): | |
new_target_i = {} | |
new_target_i["area"] = areas[i] | |
new_target_i["iscrowd"] = 0 | |
new_target_i["image_id"] = image_id | |
new_target_i["category_id"] = target.extra_fields["labels"][i].item() | |
new_target_i["id"] = None | |
new_target_i['bbox'] = target.bbox[i].numpy().tolist() | |
label_i = target.extra_fields["labels"][i].item() | |
if label_i in label_to_positions: # NOTE: Only add those that actually appear in the final caption | |
new_target_i["tokens_positive"] = [label_to_positions[label_i]] | |
new_target.append(new_target_i) | |
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i]) | |
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective | |
def check_for_positive_overflow(target, ind_to_class, tokenizer, max_seq_length=256): | |
# NOTE: Only call this function for OD data; DO NOT USE IT FOR GROUNDING DATA | |
# NOTE: called only in coco_dt | |
# Check if we have too many positive labels | |
# generate a caption by appending the positive labels | |
positive_label_set = set() | |
for i in range(len(target)): | |
label_i = target.extra_fields["labels"][i].item() | |
positive_label_set.add(label_i) | |
positive_label_list = list(positive_label_set) | |
# random shuffule so we can sample different annotations at different epochs | |
random.shuffle(positive_label_list) | |
kept_lables = [] | |
length = 0 | |
for index, label in enumerate(positive_label_list): | |
label_text = clean_name(ind_to_class[label]) + ". " # "dog. " | |
tokenized = tokenizer.tokenize(label_text) | |
length += len(tokenized) | |
if length > max_seq_length: | |
break | |
else: | |
kept_lables.append(label) | |
## filter boxes | |
keep_box_index = [] | |
for i in range(len(target)): | |
label_i = target.extra_fields["labels"][i].item() | |
if label_i in kept_lables: | |
keep_box_index.append(i) | |
keep_box_index = torch.LongTensor(keep_box_index) | |
target = target[keep_box_index] ## filter boxes | |
return target, length | |
def _label_drop_with_length_limit(label_list, ind_to_class, length_limit, tokenizer): | |
screened_label_list = [] | |
random.shuffle(label_list) # randomly drop labels | |
for label in label_list: | |
label_text = clean_name(ind_to_class[label]) + ". " # "dog. " | |
tokenized = tokenizer.tokenize(label_text) | |
length_limit -= len(tokenized) | |
if length_limit > 0: | |
screened_label_list.append(label) # keep this label | |
else: | |
break | |
return screened_label_list | |
def _randomv1_od_to_grounding(all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer): | |
label_num = np.random.randint(1, max_num_labels) | |
selected_label_list = np.random.choice(all_labels, label_num, replace=False) | |
screened_label_list = _label_drop_with_length_limit(selected_label_list, ind_to_class, max_seq_length, tokenizer) | |
return screened_label_list | |
def _randomv2_od_to_grounding(all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer, positive_label_set): | |
full_positive = len(positive_label_set) | |
full_negative = max_num_labels - full_positive | |
outer_prob = random.random() | |
if outer_prob < 0.8: | |
num_negatives = full_negative | |
num_positives = full_positive | |
elif outer_prob < 0.9: | |
num_negatives = np.random.choice(max(1, full_negative)) + 1 # mininum 1 | |
num_positives = full_positive | |
else: | |
num_positives = np.random.choice(max(1, full_positive)) + 1 # mininum 1 | |
num_negatives = full_negative | |
# Keep some negatives | |
negative_label_list = [label for label in all_labels if label not in positive_label_set] | |
random.shuffle(negative_label_list) | |
negative_label_list = negative_label_list[:num_negatives] | |
# Keep some positives | |
positive_label_list = list(positive_label_set) | |
random.shuffle(positive_label_list) | |
positive_label_list = positive_label_list[:num_positives] | |
selected_label_list = positive_label_list + negative_label_list | |
screened_label_list = _label_drop_with_length_limit(selected_label_list, ind_to_class, max_seq_length, tokenizer) | |
return screened_label_list | |
def od_to_grounding_optimized_streamlined( | |
target, | |
image_id, | |
ind_to_class, | |
tokenizer, | |
od_to_grounding_version, | |
): | |
if od_to_grounding_version == "random.v1": | |
separation_tokens = ". " | |
max_num_labels = 85 | |
max_seq_length = 254 | |
elif od_to_grounding_version == "random.v2": | |
separation_tokens = ". " | |
max_num_labels = 60 | |
max_seq_length = 254 | |
def generate_senetence_given_labels( | |
label_list, | |
disable_shuffle=False, | |
): | |
label_to_positions = {} | |
if not disable_shuffle: | |
random.shuffle(label_list) | |
pheso_caption = "" | |
for index, label in enumerate(label_list): | |
start_index = len(pheso_caption) | |
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change... | |
end_index = len(pheso_caption) | |
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17 | |
# label_to_positions: {4: (0, 3), 17: (4, 7)} | |
label_to_positions[label] = [start_index, end_index] | |
if index != len(label_list) - 1: | |
pheso_caption += separation_tokens | |
return label_to_positions, pheso_caption | |
if od_to_grounding_version == "random.v1": | |
# all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer | |
screened_label_list = _randomv1_od_to_grounding( | |
all_labels = list(ind_to_class.keys()), | |
ind_to_class = ind_to_class, | |
max_seq_length = max_seq_length, | |
max_num_labels = max_num_labels, | |
tokenizer = tokenizer, | |
) | |
label_to_positions, pheso_caption = generate_senetence_given_labels( | |
label_list=screened_label_list, ) | |
elif od_to_grounding_version == "random.v2": | |
screened_label_list = _randomv2_od_to_grounding( | |
all_labels = list(ind_to_class.keys()), | |
ind_to_class = ind_to_class, | |
max_seq_length = max_seq_length, | |
max_num_labels = max_num_labels, | |
tokenizer = tokenizer, | |
positive_label_set = set(target.extra_fields["labels"].tolist()), | |
) | |
label_to_positions, pheso_caption = generate_senetence_given_labels( | |
label_list=screened_label_list, ) | |
else: | |
raise NotImplementedError | |
new_target = [] | |
''' | |
Convert into: | |
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]} | |
tokens_positive is the char position | |
''' | |
areas = target.area() | |
greenlight_span_for_masked_lm_objective = [] | |
for i in range(len(target)): | |
new_target_i = {} | |
new_target_i["area"] = areas[i] | |
new_target_i["iscrowd"] = 0 | |
new_target_i["image_id"] = image_id | |
new_target_i["category_id"] = target.extra_fields["labels"][i].item() | |
new_target_i["id"] = None | |
new_target_i['bbox'] = target.bbox[i].numpy().tolist() | |
label_i = target.extra_fields["labels"][i].item() | |
new_target_i["original_od_label"] = label_i | |
if label_i in label_to_positions: # NOTE: Only add labels that actually appear in the final caption | |
new_target_i["tokens_positive"] = [label_to_positions[label_i]] | |
new_target.append(new_target_i) | |
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i]) | |
# reconstruct the target | |
new_target_boxlist = BoxList(torch.as_tensor([i['bbox'] for i in new_target]).reshape(-1, 4), target.size, mode="xyxy") | |
new_target_boxlist.add_field("labels", torch.as_tensor([i['category_id'] for i in new_target])) | |
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective, label_to_positions, new_target_boxlist | |
def convert_object_detection_to_grounding_optimized_for_od( | |
target, | |
image_id, | |
ind_to_class, | |
disable_shuffle, | |
add_detection_prompt, | |
add_detection_prompt_advanced, | |
random_sample_negative, | |
control_probabilities, | |
restricted_negative_list=None, | |
separation_tokens=" ", | |
max_num_labels=-1, | |
max_seq_length=256, | |
tokenizer=None, | |
positive_caption_length=0, | |
od_to_grounding_version = "vanilla", | |
): | |
''' | |
ind_to_class: {0: "__background__", 1 : "person" ...} | |
target: | |
restricted_negative_list : for datasets with restricted negatives, sample only the negatives | |
Convert object detection data into grounding data format, on the fly. | |
Control options: | |
1. add_detection_prompt: add "object detection : " to the front of the prompt | |
2. num_negatives: randomly sampled negative classes | |
3. num_positives: how many positives to keep (-1 means do not cut any) | |
Probabilities to generate the control options: | |
a. probability_one_negative: only give one negative class to mimic evaluation | |
b. probability_one_positive: only give one positive class to mimic evaluation | |
c. probability_full: add both all positive and all negatives | |
d. other: | |
randomly sample some negatives and some positives | |
The below control options are independent of each other: | |
- probability_random_negative: probability of randomly sample X negatives | |
- probability_random_positive: probability of randomly sample some positives | |
NEW: control version; we will have a few pre-defined control versions; and we only need to sepecify the version instead of all the detailed paratmeters | |
''' | |
def generate_senetence_given_labels( | |
positive_label_list, | |
negative_label_list, | |
prompt_engineer_version="v2", | |
disable_shuffle=False, | |
positive_question_probability=0.6, | |
negative_question_probability=0.8, | |
full_question_probability=0.5): | |
''' | |
v3: with simple prompt such as "there are", "are there?" | |
v4: try to merge some are there / there are together, to avoid sequence being too long | |
''' | |
label_to_positions = {} | |
assert (prompt_engineer_version == "v2") | |
num_negatives = len(negative_label_list) | |
num_positives = len(positive_label_list) | |
label_list = negative_label_list + positive_label_list | |
if not disable_shuffle: | |
random.shuffle(label_list) | |
if add_detection_prompt: | |
if add_detection_prompt_advanced and (num_negatives == 0 or num_positives == 0) and not disable_shuffle: | |
pheso_caption = "object detection query : " | |
else: | |
pheso_caption = "object detection : " | |
else: | |
pheso_caption = "" | |
for index, label in enumerate(label_list): | |
start_index = len(pheso_caption) | |
pheso_caption += clean_name(ind_to_class[label]) # NOTE: slight change... | |
end_index = len(pheso_caption) | |
# e.g.: pheso_caption = "cat dog", where cat is label 4, and dog is label 17 | |
# label_to_positions: {4: (0, 3), 17: (4, 7)} | |
label_to_positions[label] = [start_index, end_index] | |
if index != len(label_list) - 1: | |
pheso_caption += separation_tokens | |
return label_to_positions, pheso_caption | |
positive_label_set = set() | |
for i in range(len(target)): | |
label_i = target.extra_fields["labels"][i].item() | |
positive_label_set.add(label_i) | |
if restricted_negative_list is None: | |
valid_negative_indexes = list(ind_to_class.keys()) | |
else: | |
valid_negative_indexes = restricted_negative_list | |
all_vailable_labels = positive_label_set | set(valid_negative_indexes) | |
if disable_shuffle: | |
label_list = list(sorted(ind_to_class.keys()))[1:] # do not include the background | |
label_to_positions, pheso_caption = generate_senetence_given_labels( | |
positive_label_list=label_list, | |
negative_label_list=[], | |
disable_shuffle=True) | |
elif od_to_grounding_version == "random": | |
# all_labels, ind_to_class, max_seq_length, max_num_labels, tokenizer | |
screened_label_list = _random_od_to_grounding( | |
all_labels = all_vailable_labels, | |
ind_to_class = ind_to_class, | |
max_seq_length = max_seq_length, | |
max_num_labels = max_num_labels, | |
tokenizer = tokenizer, | |
) | |
label_to_positions, pheso_caption = generate_senetence_given_labels( | |
positive_label_list=screened_label_list) | |
else: | |
full_positive = len(positive_label_set) | |
if max_num_labels <= 0: | |
full_negative = random_sample_negative | |
else: | |
full_negative = max(min(max_num_labels-full_positive, random_sample_negative), 0) | |
if full_negative > len(valid_negative_indexes): | |
full_negative = len(valid_negative_indexes) | |
num_negatives, num_positives = generate_control_options_given_probabilities( | |
control_probabilities=control_probabilities, | |
full_positive=full_positive, | |
full_negative=full_negative) | |
# num_positives not used | |
# Keep some negatives | |
negative_label_list = set() | |
if num_negatives != -1: | |
if num_negatives > len(valid_negative_indexes): | |
num_negatives = len(valid_negative_indexes) | |
for i in np.random.choice(valid_negative_indexes, size=num_negatives, replace=False): | |
# label_sets.add(i) | |
if i not in positive_label_set: | |
negative_label_list.add(i) | |
# Keep all positives; ignoring num_positives | |
positive_label_list = list(positive_label_set) | |
random.shuffle(positive_label_list) | |
negative_label_list = list(negative_label_list) # e.g.: [17, 1, 13] where each number is the class name | |
random.shuffle(negative_label_list) | |
# Do a pre-screen. If we cannot afford this many negatives, we will sample less | |
negative_max_length = max_seq_length - positive_caption_length | |
screened_negative_label_list = [] | |
for negative_label in negative_label_list: | |
label_text = clean_name(ind_to_class[negative_label]) + ". " # "dog. " | |
tokenized = tokenizer.tokenize(label_text) | |
negative_max_length -= len(tokenized) | |
if negative_max_length > 0: | |
screened_negative_label_list.append(negative_label) # keep this negative | |
else: | |
break | |
negative_label_list = screened_negative_label_list | |
label_to_positions, pheso_caption = generate_senetence_given_labels( | |
positive_label_list=positive_label_list, | |
negative_label_list=negative_label_list) | |
new_target = [] | |
''' | |
Convert into: | |
{'area': 10506.0, 'iscrowd': 0, 'image_id': 571335, 'category_id': 1, 'id': 2999421, 'bbox': [221, 319, 103, 102], 'tokens_positive': [[0, 3]]} | |
tokens_positive is the char position | |
''' | |
areas = target.area() | |
greenlight_span_for_masked_lm_objective = [] | |
for i in range(len(target)): | |
new_target_i = {} | |
new_target_i["area"] = areas[i] | |
new_target_i["iscrowd"] = 0 | |
new_target_i["image_id"] = image_id | |
new_target_i["category_id"] = target.extra_fields["labels"][i].item() | |
new_target_i["id"] = None | |
new_target_i['bbox'] = target.bbox[i].numpy().tolist() | |
label_i = target.extra_fields["labels"][i].item() | |
new_target_i["original_od_label"] = label_i | |
if label_i in label_to_positions: # NOTE: Only add those that actually appear in the final caption | |
new_target_i["tokens_positive"] = [label_to_positions[label_i]] | |
new_target.append(new_target_i) | |
greenlight_span_for_masked_lm_objective.append(label_to_positions[label_i]) | |
return new_target, pheso_caption, greenlight_span_for_masked_lm_objective, label_to_positions | |
def generate_control_options_given_probabilities( | |
control_probabilities, | |
full_positive, | |
full_negative): | |
# The function was originally designed to perform data augmentation by randomly dropping negative and positive classes. Later, we decided to only consider dropping negative classes. So the returned 'num_positives' by this function will be ignored. | |
outer_prob = random.random() | |
probability_one_negative = control_probabilities[0] | |
probability_one_positive = control_probabilities[1] | |
probability_full = control_probabilities[2] | |
probability_drop_positive = control_probabilities[3] | |
assert(probability_drop_positive == 0) | |
if outer_prob < probability_one_negative: | |
# a. probability_one_negative: only give one negative class to mimic evaluation (10%) | |
num_negatives = 1 | |
num_positives = 0 | |
elif outer_prob < probability_one_positive + probability_one_negative: | |
# b. probability_one_positive: only give one positive class to mimic evaluation (10%) | |
num_negatives = 0 | |
num_positives = 1 | |
elif outer_prob < probability_full + probability_one_positive + probability_one_negative: | |
# c. probability_full: add both all positive and all negatives (20%) | |
num_negatives = full_negative | |
num_positives = full_positive | |
else: | |
if random.random() < 1.0: # - probability_random_negative: probability of randomly sample X negatives (100%) | |
num_negatives = np.random.choice(max(1, full_negative)) + 1 # mininum 1 | |
else: | |
num_negatives = full_negative # Full | |
if random.random() < probability_drop_positive: # | |
num_positives = np.random.choice(max(1, full_positive)) + 1 | |
else: | |
num_positives = full_positive # Full | |
return num_negatives, num_positives | |