Luuu / frame_field_learning /local_utils.py
็™ฝ้นญๅ…ˆ็”Ÿ
init
abd2a81
raw
history blame
6.9 kB
import functools
import os
import subprocess
import sys
import time
from lydorn_utils import run_utils, print_utils
from lydorn_utils import python_utils
def compute_max_disp(disp_params):
m_g_t = disp_params["max_global_translation"]
m_g_h = disp_params["max_global_homography"]
m_p_t = disp_params["max_poly_translation"]
m_p_h = disp_params["max_poly_homography"]
m_h_c = disp_params["max_homography_coef"]
return (m_g_t + m_h_c*m_g_h) + (m_p_t + m_h_c*m_p_h)
def get_git_revision_hash():
try:
hash = subprocess.check_output(['git', 'rev-parse', 'HEAD'], stderr=subprocess.STDOUT).decode("utf-8")[:-1]
except subprocess.CalledProcessError:
hash = None
return hash
def setup_run(config):
run_name = config["run_name"]
new_run = config["new_run"]
init_run_name = config["init_run_name"]
working_dir = os.path.dirname(os.path.abspath(__file__))
runs_dir = os.path.join(working_dir, config["runs_dirpath"])
# setup init checkpoints directory path if one is specified:
if init_run_name is not None:
init_run_dirpath = run_utils.setup_run_dir(runs_dir, init_run_name)
_, init_checkpoints_dirpath = run_utils.setup_run_subdirs(init_run_dirpath)
else:
init_checkpoints_dirpath = None
# setup run directory:
run_dirpath = run_utils.setup_run_dir(runs_dir, run_name, new_run)
# save config in logs directory
run_utils.save_config(config, run_dirpath)
# save args
args_filepath = os.path.join(run_dirpath, "args.json")
args_to_save = {
"run_name": run_name,
"new_run": new_run,
"init_run_name": init_run_name,
"batch_size": config["optim_params"]["batch_size"],
}
if "samples" in config:
args_to_save["samples"] = config["samples"]
python_utils.save_json(args_filepath, args_to_save)
# save current commit hash
commit_hash = get_git_revision_hash()
if commit_hash is not None:
commit_hash_filepath = os.path.join(run_dirpath, "commit_history.json")
if os.path.exists(commit_hash_filepath):
commit_hashes = python_utils.load_json(commit_hash_filepath)
if commit_hashes[-1] != commit_hash:
commit_hashes.append(commit_hash)
python_utils.save_json(commit_hash_filepath, commit_hashes)
else:
commit_hashes = [commit_hash]
python_utils.save_json(commit_hash_filepath, commit_hashes)
return run_dirpath, init_checkpoints_dirpath
def get_run_dirpath(runs_dirpath, run_name):
working_dir = os.path.dirname(os.path.abspath(__file__))
runs_dir = os.path.join(working_dir, runs_dirpath)
try:
run_dirpath = run_utils.setup_run_dir(runs_dir, run_name, check_exists=True)
except FileNotFoundError as e:
print_utils.print_error(f"ERROR: {e}")
sys.exit()
return run_dirpath
def batch_to_cuda(batch):
# Send data to computing device:
for key, item in batch.items():
if hasattr(item, "cuda"):
batch[key] = item.cuda(non_blocking=True)
return batch
def batch_to_cpu(batch):
# Send data to computing device:
for key, item in batch.items():
if hasattr(item, "cuda"):
batch[key] = item.cpu()
return batch
def split_batch(tile_data):
assert len(tile_data["image"].shape) == 4, "tile_data[\"image\"] should be (N, C, H, W)"
tile_data_list = []
for i in range(tile_data["image"].shape[0]):
individual_tile_data = {}
for key, item in tile_data.items():
if not i < len(item):
print(key, len(item))
individual_tile_data[key] = item[i]
tile_data_list.append(individual_tile_data)
return tile_data_list
def _concat_dictionaries(dict1, dict2):
"""
Recursive concat dictionaries. Dict 1 and Dict 2 must have the same key hierarchy (this is not checked).
:param dict1: Dictionary to add to.
:param dict2: Dictionary to add from
:return: Merged dictionary dict1
"""
for key in dict1.keys():
item1 = dict1[key]
item2 = dict2[key]
if isinstance(item1, dict): # And item2 is dict too.
dict1[key] = _concat_dictionaries(item1, item2)
else:
dict1[key].extend(item2)
return dict1
def _root_concat_dictionaries(dict1, dict2):
t0 = time.time()
dict1 = _concat_dictionaries(dict1, dict2)
print(f"_root_concat_dictionaries: {time.time() - t0:02}s")
return dict1
def list_of_dicts_to_dict_of_lists(list_of_dicts):
"""
Works recursively by using _concat_dictionaries which is recursive
@param list_of_dicts:
@return: dict_of_lists
"""
return functools.reduce(_concat_dictionaries, list_of_dicts)
def flatten_dict(_dict):
"""
Makes a hierarchy of dicts flat
@param _dict:
@return:
"""
new_dict = {}
for key, item in _dict.items():
if isinstance(item, dict):
item = flatten_dict(item)
for k in item.keys():
new_dict[key + "." + k] = item[k]
else:
new_dict[key] = item
return new_dict
def _generate_list_of_dicts(list_length, methods_count, submethods_count, annotation_count, segmentation_length):
list_of_dicts = []
for i in range(list_length):
d = {}
for method_i in range(methods_count):
d[f"method_{method_i}"] = {}
for submethod_i in range(submethods_count):
d[f"method_{method_i}"][f"submethod_{submethod_i}"] = []
for annotation_i in range(annotation_count):
annotation = {
"image_id": 0,
"segmentation": [list(range(segmentation_length))],
"category_id": 100, # Building
"bbox": [0, 1, 0, 1],
"score": 1.0
}
d[f"method_{method_i}"][f"submethod_{submethod_i}"].append(annotation)
list_of_dicts.append(d)
return list_of_dicts
def main():
# list_of_dicts = [
# {
# "method1": {
# "submethod1": [[0, 1, 2, 3], [4, 5, 6]]
# }
# },
# {
# "method1": {
# "submethod1": [[7, 8, 9], [10, 11, 12, 13, 14, 15]]
# }
# },
# ]
t0 = time.time()
list_of_dicts = _generate_list_of_dicts(list_length=2000, methods_count=2, submethods_count=2, annotation_count=100, segmentation_length=200)
print(f"_generate_list_of_dicts: {time.time() - t0:02}s")
t0 = time.time()
dict_of_lists = list_of_dicts_to_dict_of_lists(list_of_dicts)
print(f"list_of_dicts_to_dict_of_lists: {time.time() - t0:02}s")
flat_dict_of_lists = flatten_dict(dict_of_lists)
if __name__ == "__main__":
main()