Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import random | |
| import pickle | |
| import shutil | |
| def load_pickle_and_assign_split(pkl_dir,split): | |
| pkl_path = os.path.join(pkl_dir, f"OCL_annot_{split}.pkl") | |
| with open(pkl_path, 'rb') as fp: | |
| pkl = pickle.load(fp) | |
| for x in pkl: | |
| x['split'] = split | |
| return pkl | |
| def load_class_json(name): | |
| with open(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data/resources",f"OCL_class_{name}.json"),"r") as fp: | |
| return json.load(fp) | |
| pkl_dir = "/home/lixingyu/workspace_lxy2/Data/OCL_data/data/resources" | |
| attrs = load_class_json("attribute") | |
| aff_dict = load_class_json("affordance") | |
| aff = [] | |
| for aff_item in aff_dict: | |
| # aff.append(aff_item["word"]) | |
| if aff_item["word"][0] not in aff: | |
| aff.append(aff_item["word"][0]) | |
| else: | |
| if len(aff_item["word"]) > 1: | |
| random_aff = random.choice(aff_item["word"]) | |
| while random_aff in aff and random_aff == aff_item["word"][0]: | |
| random_aff = random.choice(aff_item["word"]) | |
| aff.append(random_aff) | |
| else: | |
| if len(aff_dict[aff.index(aff_item["word"][0])]["word"]) > 1: | |
| random_aff = random.choice(aff_dict[aff.index(aff_item["word"][0])]["word"]) | |
| while random_aff in aff and random_aff == aff_item["word"][0]: | |
| random_aff = random.choice(aff_dict[aff.index(aff_item["word"][0])]["word"]) | |
| aff[aff.index(aff_item["word"][0])] = random_aff | |
| aff.append(aff_item["word"][0]) | |
| else: | |
| aff.append(aff_item["word"][0] + '_1') | |
| print(len(attrs)) | |
| exit() | |
| attrs_2_idx = {attr_item:idx for idx,attr_item in enumerate(attrs)} | |
| aff_2_idx = {aff_item:idx for idx,aff_item in enumerate(aff)} | |
| selected_affordance = ['break', 'carry', 'clean','close','cut','eat','open','push','sit','write'] | |
| selected_paired_affs = ['sit-write','push-carry','cut-clean','open-break', 'cut-close'] | |
| selected_aff_id = [aff_2_idx[aff_item] for aff_item in selected_affordance] | |
| ocl_test_aff_pkl = {} | |
| selected_pkl = {} | |
| selected_paired_pkl = {} | |
| negative_aff = [] | |
| for aff_idx, aff_item in enumerate(aff_dict): | |
| aff_flag = False | |
| for sel_aff in selected_affordance: | |
| if sel_aff in aff_item["word"]: | |
| aff_flag = True | |
| break | |
| if not aff_flag: | |
| negative_aff.append(aff[aff_idx]) | |
| negative_aff_id = [aff_2_idx[aff_item] for aff_item in negative_aff] | |
| negative_pkl = [] | |
| for data_idx,each_item in enumerate(test_pkl): | |
| if os.path.exists(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data",each_item["name"])): | |
| for obj_idx,each_object in enumerate(each_item['objects']): | |
| if (abs(each_object['box'][0] - each_object['box'][2])) * (abs(each_object['box'][1] - each_object['box'][3])) > 50000: | |
| if len(list(set(each_object['aff']).intersection(set(selected_aff_id)))) > 0: | |
| for each_sel_aff in selected_affordance: | |
| if aff_2_idx[each_sel_aff] in each_object['aff']: | |
| if each_sel_aff not in selected_pkl.keys(): | |
| selected_pkl[each_sel_aff] = [[data_idx, obj_idx]] | |
| else: | |
| selected_pkl[each_sel_aff].append([data_idx, obj_idx]) | |
| for each_paired_sel_aff in selected_paired_affs: | |
| aff1,aff2 = each_paired_sel_aff.split("-") | |
| if aff_2_idx[aff1] in each_object['aff'] and aff_2_idx[aff2] in each_object['aff']: | |
| if each_paired_sel_aff not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_aff] = {} | |
| if each_paired_sel_aff not in selected_paired_pkl[each_paired_sel_aff].keys(): | |
| selected_paired_pkl[each_paired_sel_aff][each_paired_sel_aff] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_aff][each_paired_sel_aff].append([data_idx, obj_idx]) | |
| elif aff_2_idx[aff1] in each_object['aff']: | |
| if each_paired_sel_aff not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_aff] = {} | |
| if aff1 not in selected_paired_pkl[each_paired_sel_aff].keys(): | |
| selected_paired_pkl[each_paired_sel_aff][aff1] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_aff][aff1].append([data_idx, obj_idx]) | |
| elif aff_2_idx[aff2] in each_object['aff']: | |
| if each_paired_sel_aff not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_aff] = {} | |
| if aff2 not in selected_paired_pkl[each_paired_sel_aff].keys(): | |
| selected_paired_pkl[each_paired_sel_aff][aff2] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_aff][aff2].append([data_idx, obj_idx]) | |
| else: | |
| if len(list(set(each_object['aff']).intersection(set(negative_aff_id)))) == len(each_object['aff']): | |
| negative_pkl.append([data_idx,obj_idx]) | |
| if not os.path.exists(os.path.dirname(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data/saved_test_data",each_item["name"]))): | |
| os.makedirs(os.path.dirname(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data/saved_test_data",each_item["name"]))) | |
| shutil.copy(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data",each_item["name"]),os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data/saved_test_data",each_item["name"])) | |
| print("negative_pkl has {} objects".format(len(negative_pkl))) | |
| for name in selected_pkl.keys(): | |
| print(f"selected {name} affordance has {len(selected_pkl[name])} objects") | |
| for name in selected_paired_pkl.keys(): | |
| for sub_name in selected_paired_pkl[name].keys(): | |
| print(f"selected {name} paired affordance {sub_name} has {len(selected_paired_pkl[name][sub_name])} objects") | |
| ocl_test_aff_pkl["selected_individual_pkl"] = selected_pkl | |
| ocl_test_aff_pkl["selected_paired_pkl"] = selected_paired_pkl | |
| ocl_test_aff_pkl["negative_pkl"] = negative_pkl | |
| with open(os.path.join(pkl_dir,"OCL_selected_test_affordance.pkl"),"wb") as fp: | |
| pickle.dump(ocl_test_aff_pkl,fp) | |
| selected_attrs = ['wooden', 'metal', 'flying', 'ripe', 'fresh', 'natural', 'cooked', 'painted', 'rusty', 'furry'] | |
| selected_attrs_id = [attrs_2_idx[attr_item] for attr_item in selected_attrs] | |
| selected_paired_attrs = ["furry-metal","fresh-cooked","natural-ripe","painted-rusty"] | |
| selected_affs = [] | |
| selected_paired_affs = [] | |
| ocl_test_attr_pkl = {} | |
| selected_pkl = {} | |
| selected_paired_pkl = {} | |
| negative_pkl = [] | |
| for data_idx,each_item in enumerate(test_pkl): | |
| if os.path.exists(os.path.join("/home/lixingyu/workspace_lxy2/Data/OCL_data/data",each_item["name"])): | |
| for obj_idx,each_object in enumerate(each_item['objects']): | |
| if (abs(each_object['box'][0] - each_object['box'][2])) * (abs(each_object['box'][1] - each_object['box'][3])) > 50000: | |
| if len(list(set(each_object['attr']).intersection(set(selected_attrs_id)))) > 0: | |
| for each_sel_attr in selected_attrs: | |
| if attrs_2_idx[each_sel_attr] in each_object['attr']: | |
| if each_sel_attr not in selected_pkl.keys(): | |
| selected_pkl[each_sel_attr] = [[data_idx, obj_idx]] | |
| else: | |
| selected_pkl[each_sel_attr].append([data_idx, obj_idx]) | |
| for each_paired_sel_attr in selected_paired_attrs: | |
| attr1,attr2 = each_paired_sel_attr.split("-") | |
| if attrs_2_idx[attr1] in each_object['attr'] and attrs_2_idx[attr2] in each_object['attr']: | |
| if each_paired_sel_attr not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_attr] = {} | |
| if each_paired_sel_attr not in selected_paired_pkl[each_paired_sel_attr].keys(): | |
| selected_paired_pkl[each_paired_sel_attr][each_paired_sel_attr] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_attr][each_paired_sel_attr].append([data_idx, obj_idx]) | |
| elif attrs_2_idx[attr1] in each_object['attr']: | |
| if each_paired_sel_attr not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_attr] = {} | |
| if attr1 not in selected_paired_pkl[each_paired_sel_attr].keys(): | |
| selected_paired_pkl[each_paired_sel_attr][attr1] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_attr][attr1].append([data_idx, obj_idx]) | |
| elif attrs_2_idx[attr2] in each_object['attr']: | |
| if each_paired_sel_attr not in selected_paired_pkl.keys(): | |
| selected_paired_pkl[each_paired_sel_attr] = {} | |
| if attr2 not in selected_paired_pkl[each_paired_sel_attr].keys(): | |
| selected_paired_pkl[each_paired_sel_attr][attr2] = [[data_idx, obj_idx]] | |
| else: | |
| selected_paired_pkl[each_paired_sel_attr][attr2].append([data_idx, obj_idx]) | |
| else: | |
| negative_pkl.append([data_idx,obj_idx]) | |
| print("finshed!!!") | |
| print("negative_pkl has {} objects".format(len(negative_pkl))) | |
| for name in selected_pkl.keys(): | |
| print(f"selected {name} attribute has {len(selected_pkl[name])} objects") | |
| for name in selected_paired_pkl.keys(): | |
| for sub_name in selected_paired_pkl[name].keys(): | |
| print(f"selected {name} paired attribute {sub_name} has {len(selected_paired_pkl[name][sub_name])} objects") | |
| ocl_test_attr_pkl["selected_individual_pkl"] = selected_pkl | |
| ocl_test_attr_pkl["selected_paired_pkl"] = selected_paired_pkl | |
| ocl_test_attr_pkl["negative_pkl"] = negative_pkl | |
| with open(os.path.join(pkl_dir,"OCL_selected_test_attribute.pkl"),"wb") as fp: | |
| pickle.dump(ocl_test_attr_pkl,fp) | |