import os import sys import shutil sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # NOQA import numpy as np import argparse from PIL import Image from .mask_generators import get_video_masks_by_moving_random_stroke, get_masked_ratio from .util import make_dirs, make_dir_under_root, get_everything_under from .readers import MaskReader def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( '-od', '--output_dir', type=str, help="Output directory name" ) parser.add_argument( '-im', '--image_masks', action='store_true', help="Set this if you want to generate independent masks in one directory." ) parser.add_argument( '-vl', '--video_len', type=int, help="Maximum video length (i.e. #mask)" ) parser.add_argument( '-ns', '--num_stroke', type=int, help="Number of stroke in one mask" ) parser.add_argument( '-nsb', '--num_stroke_bound', type=int, nargs=2, help="Upper/lower bound of number of stroke in one mask" ) parser.add_argument( '-n', type=int, help="Number of mask to generate" ) parser.add_argument( '-sp', '--stroke_preset', type=str, default='rand_curve', help="Preset of the stroke parameters" ) parser.add_argument( '-iw', '--image_width', type=int, default=320 ) parser.add_argument( '-ih', '--image_height', type=int, default=180 ) parser.add_argument( '--cluster_by_area', action='store_true' ) parser.add_argument( '--leave_boarder_unmasked', type=int, help='Set this to a number, then a copy of the mask where the mask of boarder is erased.' ) parser.add_argument( '--redo_without_generation', action='store_true', help='Set this, and the script will skip the generation and redo the left tasks' '(uncluster -> erase boarder -> re-cluster)' ) args = parser.parse_args() return args def get_stroke_preset(stroke_preset): if stroke_preset == 'object_like': return { "nVertexBound": [5, 30], "maxHeadSpeed": 15, "maxHeadAcceleration": (10, 1.5), "brushWidthBound": (20, 50), "nMovePointRatio": 0.5, "maxPiontMove": 10, "maxLineAcceleration": (5, 0.5), "boarderGap": None, "maxInitSpeed": 10, } elif stroke_preset == 'object_like_middle': return { "nVertexBound": [5, 15], "maxHeadSpeed": 8, "maxHeadAcceleration": (4, 1.5), "brushWidthBound": (20, 50), "nMovePointRatio": 0.5, "maxPiontMove": 5, "maxLineAcceleration": (5, 0.5), "boarderGap": None, "maxInitSpeed": 10, } elif stroke_preset == 'object_like_small': return { "nVertexBound": [5, 20], "maxHeadSpeed": 7, "maxHeadAcceleration": (3.5, 1.5), "brushWidthBound": (10, 30), "nMovePointRatio": 0.5, "maxPiontMove": 5, "maxLineAcceleration": (3, 0.5), "boarderGap": None, "maxInitSpeed": 4, } elif stroke_preset == 'rand_curve': return { "nVertexBound": [10, 30], "maxHeadSpeed": 20, "maxHeadAcceleration": (15, 0.5), "brushWidthBound": (3, 10), "nMovePointRatio": 0.5, "maxPiontMove": 3, "maxLineAcceleration": (5, 0.5), "boarderGap": None, "maxInitSpeed": 6 } elif stroke_preset == 'rand_curve_small': return { "nVertexBound": [6, 22], "maxHeadSpeed": 12, "maxHeadAcceleration": (8, 0.5), "brushWidthBound": (2.5, 5), "nMovePointRatio": 0.5, "maxPiontMove": 1.5, "maxLineAcceleration": (3, 0.5), "boarderGap": None, "maxInitSpeed": 3 } else: raise NotImplementedError(f'The stroke presetting "{stroke_preset}" does not exist.') def copy_masks_without_boarder(root_dir, args): def erase_mask_boarder(mask, gap): pix = np.asarray(mask).astype('uint8') * 255 pix[:gap, :] = 255 pix[-gap:, :] = 255 pix[:, :gap] = 255 pix[:, -gap:] = 255 return Image.fromarray(pix).convert('1') wo_boarder_dir = root_dir + '_noBoarder' shutil.copytree(root_dir, wo_boarder_dir) for i, filename in enumerate(get_everything_under(wo_boarder_dir)): if args.image_masks: mask = Image.open(filename) mask_wo_boarder = erase_mask_boarder(mask, args.leave_boarder_unmasked) mask_wo_boarder.save(filename) else: # filename is a diretory containing multiple mask files for f in get_everything_under(filename, pattern='*.png'): mask = Image.open(f) mask_wo_boarder = erase_mask_boarder(mask, args.leave_boarder_unmasked) mask_wo_boarder.save(f) return wo_boarder_dir def cluster_by_masked_area(root_dir, args): clustered_dir = root_dir + '_clustered' make_dirs(clustered_dir) radius = 5 # all masks with ratio in x +- radius will be stored in sub-directory x clustered_centors = np.arange(radius, 100, radius * 2) clustered_subdirs = [] for c in clustered_centors: # make sub-directories for each ratio range clustered_subdirs.append(make_dir_under_root(clustered_dir, str(c))) for i, filename in enumerate(get_everything_under(root_dir)): if args.image_masks: ratio = get_masked_ratio(Image.open(filename)) else: # filename is a diretory containing multiple mask files ratio = np.mean([ get_masked_ratio(Image.open(f)) for f in get_everything_under(filename, pattern='*.png') ]) # find the nearest centor for i, c in enumerate(clustered_centors): if c - radius <= ratio * 100 <= c + radius: shutil.move(filename, clustered_subdirs[i]) break shutil.rmtree(root_dir) os.rename(clustered_dir, root_dir) def decide_nStroke(args): if args.num_stroke is not None: return args.num_stroke elif args.num_stroke_bound is not None: return np.random.randint(args.num_stroke_bound[0], args.num_stroke_bound[1]) else: raise ValueError('One of "-ns" or "-nsb" is needed') def main(args): preset = get_stroke_preset(args.stroke_preset) make_dirs(args.output_dir) if args.redo_without_generation: assert(len(get_everything_under(args.output_dir)) > 0) # put back clustered masks for clustered_subdir in get_everything_under(args.output_dir): if not os.path.isdir(clustered_subdir): continue for f in get_everything_under(clustered_subdir): shutil.move(f, args.output_dir) os.rmdir(clustered_subdir) else: if args.image_masks: for i in range(args.n): nStroke = decide_nStroke(args) mask = get_video_masks_by_moving_random_stroke( video_len=1, imageWidth=args.image_width, imageHeight=args.image_height, nStroke=nStroke, **preset )[0] mask.save(os.path.join(args.output_dir, f'{i:07d}.png')) else: for i in range(args.n): mask_dir = make_dir_under_root(args.output_dir, f'{i:05d}') mask_reader = MaskReader(mask_dir, read=False) nStroke = decide_nStroke(args) masks = get_video_masks_by_moving_random_stroke( imageWidth=args.image_width, imageHeight=args.image_height, video_len=args.video_len, nStroke=nStroke, **preset) mask_reader.set_files(masks) mask_reader.save_files(output_dir=mask_reader.dir_name) if args.leave_boarder_unmasked is not None: dir_leave_boarder = copy_masks_without_boarder(args.output_dir, args) if args.cluster_by_area: cluster_by_masked_area(dir_leave_boarder, args) if args.cluster_by_area: cluster_by_masked_area(args.output_dir, args) if __name__ == "__main__": args = parse_args() main(args)