import math |
import multiprocessing |
import traceback |
from pathlib import Path |
import numpy as np |
import numpy.linalg as npla |
import samplelib |
from core import pathex |
from core.cv2ex import * |
from core.interact import interact as io |
from core.joblib import MPClassFuncOnDemand, MPFunc |
from core.leras import nn |
from DFLIMG import DFLIMG |
from facelib import FaceEnhancer, FaceType, LandmarksProcessor, XSegNet |
from merger import FrameInfo, InteractiveMergerSubprocessor, MergerConfig |
def main (model_class_name=None, |
saved_models_path=None, |
training_data_src_path=None, |
force_model_name=None, |
input_path=None, |
output_path=None, |
output_mask_path=None, |
aligned_path=None, |
force_gpu_idxs=None, |
cpu_only=None): |
io.log_info ("Running merger.\r\n") |
try: |
if not input_path.exists(): |
io.log_err('Input directory not found. Please ensure it exists.') |
return |
if not output_path.exists(): |
output_path.mkdir(parents=True, exist_ok=True) |
if not output_mask_path.exists(): |
output_mask_path.mkdir(parents=True, exist_ok=True) |
if not saved_models_path.exists(): |
io.log_err('Model directory not found. Please ensure it exists.') |
return |
import models |
model = models.import_model(model_class_name)(is_training=False, |
saved_models_path=saved_models_path, |
force_gpu_idxs=force_gpu_idxs, |
force_model_name=force_model_name, |
cpu_only=cpu_only) |
predictor_func, predictor_input_shape, cfg = model.get_MergerConfig() |
predictor_func = MPFunc(predictor_func) |
run_on_cpu = len(nn.getCurrentDeviceConfig().devices) == 0 |
xseg_256_extract_func = MPClassFuncOnDemand(XSegNet, 'extract', |
name='XSeg', |
resolution=256, |
weights_file_root=saved_models_path, |
place_model_on_cpu=True, |
run_on_cpu=run_on_cpu) |
face_enhancer_func = MPClassFuncOnDemand(FaceEnhancer, 'enhance', |
place_model_on_cpu=True, |
run_on_cpu=run_on_cpu) |
is_interactive = io.input_bool ("Use interactive merger?", True) if not io.is_colab() else False |
if not is_interactive: |
cfg.ask_settings() |
subprocess_count = io.input_int("Number of workers?", max(8, multiprocessing.cpu_count()), |
valid_range=[1, multiprocessing.cpu_count()], help_message="Specify the number of threads to process. A low value may affect performance. A high value may result in memory error. The value may not be greater than CPU cores." ) |
input_path_image_paths = pathex.get_image_paths(input_path) |
if cfg.type == MergerConfig.TYPE_MASKED: |
if not aligned_path.exists(): |
io.log_err('Aligned directory not found. Please ensure it exists.') |
return |
packed_samples = None |
try: |
packed_samples = samplelib.PackedFaceset.load(aligned_path) |
except: |
io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(aligned_path)}, {traceback.format_exc()}") |
if packed_samples is not None: |
io.log_info ("Using packed faceset.") |
def generator(): |
for sample in io.progress_bar_generator( packed_samples, "Collecting alignments"): |
filepath = Path(sample.filename) |
yield filepath, DFLIMG.load(filepath, loader_func=lambda x: sample.read_raw_file() ) |
else: |
def generator(): |
for filepath in io.progress_bar_generator( pathex.get_image_paths(aligned_path), "Collecting alignments"): |
filepath = Path(filepath) |
yield filepath, DFLIMG.load(filepath) |
alignments = {} |
multiple_faces_detected = False |
for filepath, dflimg in generator(): |
if dflimg is None or not dflimg.has_data(): |
io.log_err (f"{filepath.name} is not a dfl image file") |
continue |
source_filename = dflimg.get_source_filename() |
if source_filename is None: |
continue |
source_filepath = Path(source_filename) |
source_filename_stem = source_filepath.stem |
if source_filename_stem not in alignments.keys(): |
alignments[ source_filename_stem ] = [] |
alignments_ar = alignments[ source_filename_stem ] |
alignments_ar.append ( (dflimg.get_source_landmarks(), filepath, source_filepath ) ) |
if len(alignments_ar) > 1: |
multiple_faces_detected = True |
if multiple_faces_detected: |
io.log_info ("") |
io.log_info ("Warning: multiple faces detected. Only one alignment file should refer one source file.") |
io.log_info ("") |
for a_key in list(alignments.keys()): |
a_ar = alignments[a_key] |
if len(a_ar) > 1: |
for _, filepath, source_filepath in a_ar: |
io.log_info (f"alignment {filepath.name} refers to {source_filepath.name} ") |
io.log_info ("") |
alignments[a_key] = [ a[0] for a in a_ar] |
if multiple_faces_detected: |
io.log_info ("It is strongly recommended to process the faces separatelly.") |
io.log_info ("Use 'recover original filename' to determine the exact duplicates.") |
io.log_info ("") |
frames = [ InteractiveMergerSubprocessor.Frame( frame_info=FrameInfo(filepath=Path(p), |
landmarks_list=alignments.get(Path(p).stem, None) |
) |
) |
for p in input_path_image_paths ] |
if multiple_faces_detected: |
io.log_info ("Warning: multiple faces detected. Motion blur will not be used.") |
io.log_info ("") |
else: |
s = 256 |
local_pts = [ (s//2-1, s//2-1), (s//2-1,0) ] |
frames_len = len(frames) |
for i in io.progress_bar_generator( range(len(frames)) , "Computing motion vectors"): |
fi_prev = frames[max(0, i-1)].frame_info |
fi = frames[i].frame_info |
fi_next = frames[min(i+1, frames_len-1)].frame_info |
if len(fi_prev.landmarks_list) == 0 or \ |
len(fi.landmarks_list) == 0 or \ |
len(fi_next.landmarks_list) == 0: |
continue |
mat_prev = LandmarksProcessor.get_transform_mat ( fi_prev.landmarks_list[0], s, face_type=FaceType.FULL) |
mat = LandmarksProcessor.get_transform_mat ( fi.landmarks_list[0] , s, face_type=FaceType.FULL) |
mat_next = LandmarksProcessor.get_transform_mat ( fi_next.landmarks_list[0], s, face_type=FaceType.FULL) |
pts_prev = LandmarksProcessor.transform_points (local_pts, mat_prev, True) |
pts = LandmarksProcessor.transform_points (local_pts, mat, True) |
pts_next = LandmarksProcessor.transform_points (local_pts, mat_next, True) |
prev_vector = pts[0]-pts_prev[0] |
next_vector = pts_next[0]-pts[0] |
motion_vector = pts_next[0] - pts_prev[0] |
fi.motion_power = npla.norm(motion_vector) |
motion_vector = motion_vector / fi.motion_power if fi.motion_power != 0 else np.array([0,0],dtype=np.float32) |
fi.motion_deg = -math.atan2(motion_vector[1],motion_vector[0])*180 / math.pi |
if len(frames) == 0: |
io.log_info ("No frames to merge in input_dir.") |
else: |
if False: |
pass |
else: |
InteractiveMergerSubprocessor ( |
is_interactive = is_interactive, |
merger_session_filepath = model.get_strpath_storage_for_file('merger_session.dat'), |
predictor_func = predictor_func, |
predictor_input_shape = predictor_input_shape, |
face_enhancer_func = face_enhancer_func, |
xseg_256_extract_func = xseg_256_extract_func, |
merger_config = cfg, |
frames = frames, |
frames_root_path = input_path, |
output_path = output_path, |
output_mask_path = output_mask_path, |
model_iter = model.get_iter(), |
subprocess_count = subprocess_count, |
).run() |
model.finalize() |
except Exception as e: |
print ( traceback.format_exc() ) |
""" |
elif cfg.type == MergerConfig.TYPE_FACE_AVATAR: |
filesdata = [] |
for filepath in io.progress_bar_generator(input_path_image_paths, "Collecting info"): |
filepath = Path(filepath) |
dflimg = DFLIMG.x(filepath) |
if dflimg is None: |
io.log_err ("%s is not a dfl image file" % (filepath.name) ) |
continue |
filesdata += [ ( FrameInfo(filepath=filepath, landmarks_list=[dflimg.get_landmarks()] ), dflimg.get_source_filename() ) ] |
filesdata = sorted(filesdata, key=operator.itemgetter(1)) #sort by source_filename |
frames = [] |
filesdata_len = len(filesdata) |
for i in range(len(filesdata)): |
frame_info = filesdata[i][0] |
prev_temporal_frame_infos = [] |
next_temporal_frame_infos = [] |
for t in range (cfg.temporal_face_count): |
prev_frame_info = filesdata[ max(i -t, 0) ][0] |
next_frame_info = filesdata[ min(i +t, filesdata_len-1 )][0] |
prev_temporal_frame_infos.insert (0, prev_frame_info ) |
next_temporal_frame_infos.append ( next_frame_info ) |
frames.append ( InteractiveMergerSubprocessor.Frame(prev_temporal_frame_infos=prev_temporal_frame_infos, |
frame_info=frame_info, |
next_temporal_frame_infos=next_temporal_frame_infos) ) |
""" |