|
import math |
|
import multiprocessing |
|
import traceback |
|
from pathlib import Path |
|
|
|
import numpy as np |
|
import numpy.linalg as npla |
|
|
|
import samplelib |
|
from core import pathex |
|
from core.cv2ex import * |
|
from core.interact import interact as io |
|
from core.joblib import MPClassFuncOnDemand, MPFunc |
|
from core.leras import nn |
|
from DFLIMG import DFLIMG |
|
from facelib import FaceEnhancer, FaceType, LandmarksProcessor, XSegNet |
|
from merger import FrameInfo, InteractiveMergerSubprocessor, MergerConfig |
|
|
|
|
|
def main (model_class_name=None, |
|
saved_models_path=None, |
|
training_data_src_path=None, |
|
force_model_name=None, |
|
input_path=None, |
|
output_path=None, |
|
output_mask_path=None, |
|
aligned_path=None, |
|
force_gpu_idxs=None, |
|
cpu_only=None): |
|
io.log_info ("Running merger.\r\n") |
|
|
|
try: |
|
if not input_path.exists(): |
|
io.log_err('Input directory not found. Please ensure it exists.') |
|
return |
|
|
|
if not output_path.exists(): |
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
if not output_mask_path.exists(): |
|
output_mask_path.mkdir(parents=True, exist_ok=True) |
|
|
|
if not saved_models_path.exists(): |
|
io.log_err('Model directory not found. Please ensure it exists.') |
|
return |
|
|
|
|
|
import models |
|
model = models.import_model(model_class_name)(is_training=False, |
|
saved_models_path=saved_models_path, |
|
force_gpu_idxs=force_gpu_idxs, |
|
force_model_name=force_model_name, |
|
cpu_only=cpu_only) |
|
|
|
predictor_func, predictor_input_shape, cfg = model.get_MergerConfig() |
|
|
|
|
|
predictor_func = MPFunc(predictor_func) |
|
|
|
run_on_cpu = len(nn.getCurrentDeviceConfig().devices) == 0 |
|
xseg_256_extract_func = MPClassFuncOnDemand(XSegNet, 'extract', |
|
name='XSeg', |
|
resolution=256, |
|
weights_file_root=saved_models_path, |
|
place_model_on_cpu=True, |
|
run_on_cpu=run_on_cpu) |
|
|
|
face_enhancer_func = MPClassFuncOnDemand(FaceEnhancer, 'enhance', |
|
place_model_on_cpu=True, |
|
run_on_cpu=run_on_cpu) |
|
|
|
is_interactive = io.input_bool ("Use interactive merger?", True) if not io.is_colab() else False |
|
|
|
if not is_interactive: |
|
cfg.ask_settings() |
|
|
|
subprocess_count = io.input_int("Number of workers?", max(8, multiprocessing.cpu_count()), |
|
valid_range=[1, multiprocessing.cpu_count()], help_message="Specify the number of threads to process. A low value may affect performance. A high value may result in memory error. The value may not be greater than CPU cores." ) |
|
|
|
input_path_image_paths = pathex.get_image_paths(input_path) |
|
|
|
if cfg.type == MergerConfig.TYPE_MASKED: |
|
if not aligned_path.exists(): |
|
io.log_err('Aligned directory not found. Please ensure it exists.') |
|
return |
|
|
|
packed_samples = None |
|
try: |
|
packed_samples = samplelib.PackedFaceset.load(aligned_path) |
|
except: |
|
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(aligned_path)}, {traceback.format_exc()}") |
|
|
|
|
|
if packed_samples is not None: |
|
io.log_info ("Using packed faceset.") |
|
def generator(): |
|
for sample in io.progress_bar_generator( packed_samples, "Collecting alignments"): |
|
filepath = Path(sample.filename) |
|
yield filepath, DFLIMG.load(filepath, loader_func=lambda x: sample.read_raw_file() ) |
|
else: |
|
def generator(): |
|
for filepath in io.progress_bar_generator( pathex.get_image_paths(aligned_path), "Collecting alignments"): |
|
filepath = Path(filepath) |
|
yield filepath, DFLIMG.load(filepath) |
|
|
|
alignments = {} |
|
multiple_faces_detected = False |
|
|
|
for filepath, dflimg in generator(): |
|
if dflimg is None or not dflimg.has_data(): |
|
io.log_err (f"{filepath.name} is not a dfl image file") |
|
continue |
|
|
|
source_filename = dflimg.get_source_filename() |
|
if source_filename is None: |
|
continue |
|
|
|
source_filepath = Path(source_filename) |
|
source_filename_stem = source_filepath.stem |
|
|
|
if source_filename_stem not in alignments.keys(): |
|
alignments[ source_filename_stem ] = [] |
|
|
|
alignments_ar = alignments[ source_filename_stem ] |
|
alignments_ar.append ( (dflimg.get_source_landmarks(), filepath, source_filepath ) ) |
|
|
|
if len(alignments_ar) > 1: |
|
multiple_faces_detected = True |
|
|
|
if multiple_faces_detected: |
|
io.log_info ("") |
|
io.log_info ("Warning: multiple faces detected. Only one alignment file should refer one source file.") |
|
io.log_info ("") |
|
|
|
for a_key in list(alignments.keys()): |
|
a_ar = alignments[a_key] |
|
if len(a_ar) > 1: |
|
for _, filepath, source_filepath in a_ar: |
|
io.log_info (f"alignment {filepath.name} refers to {source_filepath.name} ") |
|
io.log_info ("") |
|
|
|
alignments[a_key] = [ a[0] for a in a_ar] |
|
|
|
if multiple_faces_detected: |
|
io.log_info ("It is strongly recommended to process the faces separatelly.") |
|
io.log_info ("Use 'recover original filename' to determine the exact duplicates.") |
|
io.log_info ("") |
|
|
|
frames = [ InteractiveMergerSubprocessor.Frame( frame_info=FrameInfo(filepath=Path(p), |
|
landmarks_list=alignments.get(Path(p).stem, None) |
|
) |
|
) |
|
for p in input_path_image_paths ] |
|
|
|
if multiple_faces_detected: |
|
io.log_info ("Warning: multiple faces detected. Motion blur will not be used.") |
|
io.log_info ("") |
|
else: |
|
s = 256 |
|
local_pts = [ (s//2-1, s//2-1), (s//2-1,0) ] |
|
frames_len = len(frames) |
|
for i in io.progress_bar_generator( range(len(frames)) , "Computing motion vectors"): |
|
fi_prev = frames[max(0, i-1)].frame_info |
|
fi = frames[i].frame_info |
|
fi_next = frames[min(i+1, frames_len-1)].frame_info |
|
if len(fi_prev.landmarks_list) == 0 or \ |
|
len(fi.landmarks_list) == 0 or \ |
|
len(fi_next.landmarks_list) == 0: |
|
continue |
|
|
|
mat_prev = LandmarksProcessor.get_transform_mat ( fi_prev.landmarks_list[0], s, face_type=FaceType.FULL) |
|
mat = LandmarksProcessor.get_transform_mat ( fi.landmarks_list[0] , s, face_type=FaceType.FULL) |
|
mat_next = LandmarksProcessor.get_transform_mat ( fi_next.landmarks_list[0], s, face_type=FaceType.FULL) |
|
|
|
pts_prev = LandmarksProcessor.transform_points (local_pts, mat_prev, True) |
|
pts = LandmarksProcessor.transform_points (local_pts, mat, True) |
|
pts_next = LandmarksProcessor.transform_points (local_pts, mat_next, True) |
|
|
|
prev_vector = pts[0]-pts_prev[0] |
|
next_vector = pts_next[0]-pts[0] |
|
|
|
motion_vector = pts_next[0] - pts_prev[0] |
|
fi.motion_power = npla.norm(motion_vector) |
|
|
|
motion_vector = motion_vector / fi.motion_power if fi.motion_power != 0 else np.array([0,0],dtype=np.float32) |
|
|
|
fi.motion_deg = -math.atan2(motion_vector[1],motion_vector[0])*180 / math.pi |
|
|
|
|
|
if len(frames) == 0: |
|
io.log_info ("No frames to merge in input_dir.") |
|
else: |
|
if False: |
|
pass |
|
else: |
|
InteractiveMergerSubprocessor ( |
|
is_interactive = is_interactive, |
|
merger_session_filepath = model.get_strpath_storage_for_file('merger_session.dat'), |
|
predictor_func = predictor_func, |
|
predictor_input_shape = predictor_input_shape, |
|
face_enhancer_func = face_enhancer_func, |
|
xseg_256_extract_func = xseg_256_extract_func, |
|
merger_config = cfg, |
|
frames = frames, |
|
frames_root_path = input_path, |
|
output_path = output_path, |
|
output_mask_path = output_mask_path, |
|
model_iter = model.get_iter(), |
|
subprocess_count = subprocess_count, |
|
).run() |
|
|
|
model.finalize() |
|
|
|
except Exception as e: |
|
print ( traceback.format_exc() ) |
|
|
|
|
|
""" |
|
elif cfg.type == MergerConfig.TYPE_FACE_AVATAR: |
|
filesdata = [] |
|
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"): |
|
filepath = Path(filepath) |
|
|
|
dflimg = DFLIMG.x(filepath) |
|
if dflimg is None: |
|
io.log_err ("%s is not a dfl image file" % (filepath.name) ) |
|
continue |
|
filesdata += [ ( FrameInfo(filepath=filepath, landmarks_list=[dflimg.get_landmarks()] ), dflimg.get_source_filename() ) ] |
|
|
|
filesdata = sorted(filesdata, key=operator.itemgetter(1)) #sort by source_filename |
|
frames = [] |
|
filesdata_len = len(filesdata) |
|
for i in range(len(filesdata)): |
|
frame_info = filesdata[i][0] |
|
|
|
prev_temporal_frame_infos = [] |
|
next_temporal_frame_infos = [] |
|
|
|
for t in range (cfg.temporal_face_count): |
|
prev_frame_info = filesdata[ max(i -t, 0) ][0] |
|
next_frame_info = filesdata[ min(i +t, filesdata_len-1 )][0] |
|
|
|
prev_temporal_frame_infos.insert (0, prev_frame_info ) |
|
next_temporal_frame_infos.append ( next_frame_info ) |
|
|
|
frames.append ( InteractiveMergerSubprocessor.Frame(prev_temporal_frame_infos=prev_temporal_frame_infos, |
|
frame_info=frame_info, |
|
next_temporal_frame_infos=next_temporal_frame_infos) ) |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|