Spaces:
Sleeping
Sleeping
import sys | |
sys.path.append('/share/group_machongyang/limengtian/libycnn2/lib') | |
import argparse | |
import os | |
import cv2 | |
import numpy as np | |
from multiprocessing import Process | |
from tqdm import tqdm | |
from py_ycnn import face_detection | |
detector = face_detection() | |
detector.init_face_detection('/share/group_machongyang/limengtian/libycnn2/model/') | |
def work(lst, output_folder, layout, wid): | |
need_A = 'A' in layout | |
need_B = 'B' in layout | |
if need_A: | |
os.mkdir(os.path.join(output_folder, f'lmk_A_{wid:02d}')) | |
if need_B: | |
os.mkdir(os.path.join(output_folder, f'lmk_B_{wid:02d}')) | |
res_list = [] | |
for i, (A_path, B_path) in tqdm(enumerate(lst)): | |
if os.path.isfile(A_path) and os.path.isfile(B_path): | |
if need_A: | |
A = cv2.imread(A_path) | |
A = detector.detectArray(A) | |
if len(A) < 1: | |
continue | |
A = np.array(A[0], dtype=np.float32) | |
A = A[:202].reshape(101, 2) | |
np.save(os.path.join(output_folder, f'lmk_A_{wid:02d}', f'{i:06d}.npy'), A) | |
if need_B: | |
B = cv2.imread(B_path) | |
B = detector.detectArray(B) | |
if len(B) < 1: | |
continue | |
B = np.array(B[0], dtype=np.float32) | |
B = B[:202].reshape(101, 2) | |
np.save(os.path.join(output_folder, f'lmk_B_{wid:02d}', f'{i:06d}.npy'), B) | |
if layout[0] == 'A': | |
lmk_A_path = os.path.join(output_folder, f'lmk_A_{wid:02d}', f'{i:06d}.npy') | |
else: | |
lmk_A_path = os.path.join(output_folder, f'lmk_B_{wid:02d}', f'{i:06d}.npy') | |
if layout[1] == 'A': | |
lmk_B_path = os.path.join(output_folder, f'lmk_A_{wid:02d}', f'{i:06d}.npy') | |
else: | |
lmk_B_path = os.path.join(output_folder, f'lmk_B_{wid:02d}', f'{i:06d}.npy') | |
res_list.append((A_path, B_path, lmk_A_path, lmk_B_path)) | |
with open(os.path.join(output_folder, f'lmk_{wid:02d}.txt'), 'wt') as f: | |
for A_path, B_path, lmk_A_path, lmk_B_path in res_list: | |
f.write(f'{A_path} {B_path} {lmk_A_path} {lmk_B_path}\n') | |
if __name__ == '__main__': | |
# parse arguments | |
parser = argparse.ArgumentParser(description='Landmark Generator') | |
parser.add_argument('--input_list', type=str, required=True) | |
parser.add_argument('--output_folder', type=str, required=True) | |
parser.add_argument('--layout', type=str, default='AB') | |
parser.add_argument('--worker_count', type=int, default=16) | |
args = parser.parse_args() | |
# check arguments | |
assert os.path.isfile(args.input_list), f'FileNotFound: {args.input_list}.' | |
assert not os.path.exists(args.output_folder), 'Output folder exists.' | |
assert args.layout in ['AA', 'AB', 'BA', 'BB'], 'Layout error.' | |
assert args.worker_count >= 1 and args.worker_count <= 64, 'Worker count error.' | |
os.makedirs(args.output_folder) | |
# read input list and split to each process | |
lsts = [ [] for _ in range(args.worker_count) ] | |
with open(args.input_list, 'rt') as f: | |
line = f.readline().strip() | |
total_cnt = 0 | |
while line: | |
A_path, B_path = line.split(' ') | |
lsts[total_cnt % args.worker_count].append((A_path, B_path)) | |
line = f.readline().strip() | |
total_cnt += 1 | |
print('Total images:', total_cnt) | |
# generate landmark | |
pool = [] | |
for i in range(args.worker_count): | |
proc = Process(target=work, args=(lsts[i], args.output_folder, args.layout, i)) | |
proc.start() | |
pool.append(proc) | |
for proc in pool: | |
proc.join() | |
# combine lists | |
new_fn = os.path.splitext(os.path.split(args.input_list)[1])[0] + '_with_lmk.txt' | |
with open(os.path.join(args.output_folder, new_fn), 'wt') as fw: | |
for i in range(args.worker_count): | |
with open(os.path.join(args.output_folder, f'lmk_{i:02d}.txt'), 'rt') as fr: | |
line = fr.readline() | |
while line: | |
fw.write(line) | |
line = fr.readline() | |