| | import os |
| | import cv2 |
| | import numpy as np |
| | import psutil |
| |
|
| | from roop.ProcessOptions import ProcessOptions |
| |
|
| | from roop.face_util import get_first_face, get_all_faces, rotate_anticlockwise, rotate_clockwise, clamp_cut_values |
| | from roop.utilities import compute_cosine_distance, get_device, str_to_class |
| | import roop.vr_util as vr |
| |
|
| | from typing import Any, List, Callable |
| | from roop.typing import Frame, Face |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from threading import Thread, Lock |
| | from queue import Queue |
| | from tqdm import tqdm |
| | from roop.ffmpeg_writer import FFMPEG_VideoWriter |
| | from roop.StreamWriter import StreamWriter |
| | import roop.globals |
| |
|
| |
|
| |
|
| | |
| | class eNoFaceAction(): |
| | USE_ORIGINAL_FRAME = 0 |
| | RETRY_ROTATED = 1 |
| | SKIP_FRAME = 2 |
| | SKIP_FRAME_IF_DISSIMILAR = 3, |
| | USE_LAST_SWAPPED = 4 |
| |
|
| |
|
| |
|
| | def create_queue(temp_frame_paths: List[str]) -> Queue[str]: |
| | queue: Queue[str] = Queue() |
| | for frame_path in temp_frame_paths: |
| | queue.put(frame_path) |
| | return queue |
| |
|
| |
|
| | def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]: |
| | queues = [] |
| | for _ in range(queue_per_future): |
| | if not queue.empty(): |
| | queues.append(queue.get()) |
| | return queues |
| |
|
| |
|
| |
|
| | class ProcessMgr(): |
| | input_face_datas = [] |
| | target_face_datas = [] |
| |
|
| | imagemask = None |
| |
|
| | processors = [] |
| | options : ProcessOptions = None |
| | |
| | num_threads = 1 |
| | current_index = 0 |
| | processing_threads = 1 |
| | buffer_wait_time = 0.1 |
| |
|
| | lock = Lock() |
| |
|
| | frames_queue = None |
| | processed_queue = None |
| |
|
| | videowriter= None |
| | streamwriter = None |
| |
|
| | progress_gradio = None |
| | total_frames = 0 |
| |
|
| | num_frames_no_face = 0 |
| | last_swapped_frame = None |
| |
|
| | output_to_file = None |
| | output_to_cam = None |
| |
|
| |
|
| | plugins = { |
| | 'faceswap' : 'FaceSwapInsightFace', |
| | 'mask_clip2seg' : 'Mask_Clip2Seg', |
| | 'mask_xseg' : 'Mask_XSeg', |
| | 'codeformer' : 'Enhance_CodeFormer', |
| | 'gfpgan' : 'Enhance_GFPGAN', |
| | 'dmdnet' : 'Enhance_DMDNet', |
| | 'gpen' : 'Enhance_GPEN', |
| | 'restoreformer++' : 'Enhance_RestoreFormerPPlus', |
| | 'colorizer' : 'Frame_Colorizer', |
| | 'filter_generic' : 'Frame_Filter', |
| | 'removebg' : 'Frame_Masking', |
| | 'upscale' : 'Frame_Upscale' |
| | } |
| |
|
| | def __init__(self, progress): |
| | if progress is not None: |
| | self.progress_gradio = progress |
| |
|
| | def reuseOldProcessor(self, name:str): |
| | for p in self.processors: |
| | if p.processorname == name: |
| | return p |
| | |
| | return None |
| |
|
| |
|
| | def initialize(self, input_faces, target_faces, options): |
| | self.input_face_datas = input_faces |
| | self.target_face_datas = target_faces |
| | self.num_frames_no_face = 0 |
| | self.last_swapped_frame = None |
| | self.options = options |
| | devicename = get_device() |
| |
|
| | roop.globals.g_desired_face_analysis=["landmark_3d_68", "landmark_2d_106","detection","recognition"] |
| | if options.swap_mode == "all_female" or options.swap_mode == "all_male": |
| | roop.globals.g_desired_face_analysis.append("genderage") |
| |
|
| | for p in self.processors: |
| | newp = next((x for x in options.processors.keys() if x == p.processorname), None) |
| | if newp is None: |
| | p.Release() |
| | del p |
| |
|
| | newprocessors = [] |
| | for key, extoption in options.processors.items(): |
| | p = self.reuseOldProcessor(key) |
| | if p is None: |
| | classname = self.plugins[key] |
| | module = 'roop.processors.' + classname |
| | p = str_to_class(module, classname) |
| | if p is not None: |
| | extoption.update({"devicename": devicename}) |
| | p.Initialize(extoption) |
| | newprocessors.append(p) |
| | else: |
| | print(f"Not using {module}") |
| | self.processors = newprocessors |
| |
|
| |
|
| |
|
| | if isinstance(self.options.imagemask, dict) and self.options.imagemask.get("layers") and len(self.options.imagemask["layers"]) > 0: |
| | self.options.imagemask = self.options.imagemask.get("layers")[0] |
| | |
| | self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_RGBA2GRAY) |
| | if np.any(self.options.imagemask): |
| | mo = self.input_face_datas[0].faces[0].mask_offsets |
| | self.options.imagemask = self.blur_area(self.options.imagemask, mo[4], mo[5]) |
| | self.options.imagemask = self.options.imagemask.astype(np.float32) / 255 |
| | self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_GRAY2RGB) |
| | else: |
| | self.options.imagemask = None |
| |
|
| | self.options.frame_processing = False |
| | for p in self.processors: |
| | if p.type.startswith("frame_"): |
| | self.options.frame_processing = True |
| |
|
| | |
| | |
| |
|
| |
|
| |
|
| | def run_batch(self, source_files, target_files, threads:int = 1): |
| | progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' |
| | self.total_frames = len(source_files) |
| | self.num_threads = threads |
| | with tqdm(total=self.total_frames, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: |
| | with ThreadPoolExecutor(max_workers=threads) as executor: |
| | futures = [] |
| | queue = create_queue(source_files) |
| | queue_per_future = max(len(source_files) // threads, 1) |
| | while not queue.empty(): |
| | future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress)) |
| | futures.append(future) |
| | for future in as_completed(futures): |
| | future.result() |
| |
|
| |
|
| | def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None: |
| | for f in current_files: |
| | if not roop.globals.processing: |
| | return |
| | |
| | |
| | temp_frame = cv2.imdecode(np.fromfile(f, dtype=np.uint8), cv2.IMREAD_COLOR) |
| | if temp_frame is not None: |
| | if self.options.frame_processing: |
| | for p in self.processors: |
| | frame = p.Run(temp_frame) |
| | resimg = frame |
| | else: |
| | resimg = self.process_frame(temp_frame) |
| | if resimg is not None: |
| | i = source_files.index(f) |
| | |
| | cv2.imencode(f'.{roop.globals.CFG.output_image_format}',resimg)[1].tofile(target_files[i]) |
| | if update: |
| | update() |
| |
|
| |
|
| |
|
| | def read_frames_thread(self, cap, frame_start, frame_end, num_threads): |
| | num_frame = 0 |
| | total_num = frame_end - frame_start |
| | if frame_start > 0: |
| | cap.set(cv2.CAP_PROP_POS_FRAMES,frame_start) |
| |
|
| | while True and roop.globals.processing: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | self.frames_queue[num_frame % num_threads].put(frame, block=True) |
| | num_frame += 1 |
| | if num_frame == total_num: |
| | break |
| |
|
| | for i in range(num_threads): |
| | self.frames_queue[i].put(None) |
| |
|
| |
|
| |
|
| | def process_videoframes(self, threadindex, progress) -> None: |
| | while True: |
| | frame = self.frames_queue[threadindex].get() |
| | if frame is None: |
| | self.processing_threads -= 1 |
| | self.processed_queue[threadindex].put((False, None)) |
| | return |
| | else: |
| | if self.options.frame_processing: |
| | for p in self.processors: |
| | frame = p.Run(frame) |
| | resimg = frame |
| | else: |
| | resimg = self.process_frame(frame) |
| | self.processed_queue[threadindex].put((True, resimg)) |
| | del frame |
| | progress() |
| |
|
| |
|
| | def write_frames_thread(self): |
| | nextindex = 0 |
| | num_producers = self.num_threads |
| | |
| | while True: |
| | process, frame = self.processed_queue[nextindex % self.num_threads].get() |
| | nextindex += 1 |
| | if frame is not None: |
| | if self.output_to_file: |
| | self.videowriter.write_frame(frame) |
| | if self.output_to_cam: |
| | self.streamwriter.WriteToStream(frame) |
| | del frame |
| | elif process == False: |
| | num_producers -= 1 |
| | if num_producers < 1: |
| | return |
| | |
| |
|
| |
|
| | def run_batch_inmem(self, output_method, source_video, target_video, frame_start, frame_end, fps, threads:int = 1): |
| | if len(self.processors) < 1: |
| | print("No processor defined!") |
| | return |
| |
|
| | cap = cv2.VideoCapture(source_video) |
| | |
| | frame_count = (frame_end - frame_start) + 1 |
| | width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
|
| | processed_resolution = None |
| | for p in self.processors: |
| | if hasattr(p, 'getProcessedResolution'): |
| | processed_resolution = p.getProcessedResolution(width, height) |
| | print(f"Processed resolution: {processed_resolution}") |
| | if processed_resolution is not None: |
| | width = processed_resolution[0] |
| | height = processed_resolution[1] |
| |
|
| |
|
| | self.total_frames = frame_count |
| | self.num_threads = threads |
| |
|
| | self.processing_threads = self.num_threads |
| | self.frames_queue = [] |
| | self.processed_queue = [] |
| | for _ in range(threads): |
| | self.frames_queue.append(Queue(1)) |
| | self.processed_queue.append(Queue(1)) |
| |
|
| | self.output_to_file = output_method != "Virtual Camera" |
| | self.output_to_cam = output_method == "Virtual Camera" or output_method == "Both" |
| |
|
| | if self.output_to_file: |
| | self.videowriter = FFMPEG_VideoWriter(target_video, (width, height), fps, codec=roop.globals.video_encoder, crf=roop.globals.video_quality, audiofile=None) |
| | if self.output_to_cam: |
| | self.streamwriter = StreamWriter((width, height), int(fps)) |
| |
|
| | readthread = Thread(target=self.read_frames_thread, args=(cap, frame_start, frame_end, threads)) |
| | readthread.start() |
| |
|
| | writethread = Thread(target=self.write_frames_thread) |
| | writethread.start() |
| |
|
| | progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' |
| | with tqdm(total=self.total_frames, desc='Processing', unit='frames', dynamic_ncols=True, bar_format=progress_bar_format) as progress: |
| | with ThreadPoolExecutor(thread_name_prefix='swap_proc', max_workers=self.num_threads) as executor: |
| | futures = [] |
| | |
| | for threadindex in range(threads): |
| | future = executor.submit(self.process_videoframes, threadindex, lambda: self.update_progress(progress)) |
| | futures.append(future) |
| | |
| | for future in as_completed(futures): |
| | future.result() |
| | |
| | readthread.join() |
| | writethread.join() |
| | cap.release() |
| | if self.output_to_file: |
| | self.videowriter.close() |
| | if self.output_to_cam: |
| | self.streamwriter.Close() |
| |
|
| | self.frames_queue.clear() |
| | self.processed_queue.clear() |
| |
|
| |
|
| |
|
| |
|
| | def update_progress(self, progress: Any = None) -> None: |
| | process = psutil.Process(os.getpid()) |
| | memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 |
| | progress.set_postfix({ |
| | 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', |
| | 'execution_threads': self.num_threads |
| | }) |
| | progress.update(1) |
| | if self.progress_gradio is not None: |
| | self.progress_gradio((progress.n, self.total_frames), desc='Processing', total=self.total_frames, unit='frames') |
| |
|
| |
|
| |
|
| | def process_frame(self, frame:Frame): |
| | if len(self.input_face_datas) < 1 and not self.options.show_face_masking: |
| | return frame |
| | temp_frame = frame.copy() |
| | num_swapped, temp_frame = self.swap_faces(frame, temp_frame) |
| | if num_swapped > 0: |
| | if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME_IF_DISSIMILAR: |
| | if len(self.input_face_datas) > num_swapped: |
| | return None |
| | self.num_frames_no_face = 0 |
| | self.last_swapped_frame = temp_frame.copy() |
| | return temp_frame |
| | if roop.globals.no_face_action == eNoFaceAction.USE_LAST_SWAPPED: |
| | if self.last_swapped_frame is not None and self.num_frames_no_face < self.options.max_num_reuse_frame: |
| | self.num_frames_no_face += 1 |
| | return self.last_swapped_frame.copy() |
| | return frame |
| |
|
| | elif roop.globals.no_face_action == eNoFaceAction.USE_ORIGINAL_FRAME: |
| | return frame |
| | if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME: |
| | |
| | |
| | |
| | |
| | return None |
| | else: |
| | return self.retry_rotated(frame) |
| |
|
| | def retry_rotated(self, frame): |
| | copyframe = frame.copy() |
| | copyframe = rotate_clockwise(copyframe) |
| | temp_frame = copyframe.copy() |
| | num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame) |
| | if num_swapped > 0: |
| | return rotate_anticlockwise(temp_frame) |
| | |
| | copyframe = frame.copy() |
| | copyframe = rotate_anticlockwise(copyframe) |
| | temp_frame = copyframe.copy() |
| | num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame) |
| | if num_swapped > 0: |
| | return rotate_clockwise(temp_frame) |
| | del copyframe |
| | return frame |
| | |
| |
|
| |
|
| | def swap_faces(self, frame, temp_frame): |
| | num_faces_found = 0 |
| |
|
| | if self.options.swap_mode == "first": |
| | face = get_first_face(frame) |
| |
|
| | if face is None: |
| | return num_faces_found, frame |
| | |
| | num_faces_found += 1 |
| | temp_frame = self.process_face(self.options.selected_index, face, temp_frame) |
| | del face |
| |
|
| | else: |
| | faces = get_all_faces(frame) |
| | if faces is None: |
| | return num_faces_found, frame |
| | |
| | if self.options.swap_mode == "all": |
| | for face in faces: |
| | num_faces_found += 1 |
| | temp_frame = self.process_face(self.options.selected_index, face, temp_frame) |
| |
|
| | elif self.options.swap_mode == "all_input": |
| | for i,face in enumerate(faces): |
| | num_faces_found += 1 |
| | if i < len(self.input_face_datas): |
| | temp_frame = self.process_face(i, face, temp_frame) |
| | else: |
| | break |
| | |
| | elif self.options.swap_mode == "selected": |
| | num_targetfaces = len(self.target_face_datas) |
| | use_index = num_targetfaces == 1 |
| | for i,tf in enumerate(self.target_face_datas): |
| | for face in faces: |
| | if compute_cosine_distance(tf.embedding, face.embedding) <= self.options.face_distance_threshold: |
| | if i < len(self.input_face_datas): |
| | if use_index: |
| | temp_frame = self.process_face(self.options.selected_index, face, temp_frame) |
| | else: |
| | temp_frame = self.process_face(i, face, temp_frame) |
| | num_faces_found += 1 |
| | if not roop.globals.vr_mode and num_faces_found == num_targetfaces: |
| | break |
| | elif self.options.swap_mode == "all_female" or self.options.swap_mode == "all_male": |
| | gender = 'F' if self.options.swap_mode == "all_female" else 'M' |
| | for face in faces: |
| | if face.sex == gender: |
| | num_faces_found += 1 |
| | temp_frame = self.process_face(self.options.selected_index, face, temp_frame) |
| | |
| | |
| | for face in faces: |
| | del face |
| | faces.clear() |
| |
|
| |
|
| |
|
| | if roop.globals.vr_mode and num_faces_found % 2 > 0: |
| | |
| | num_faces_found = 0 |
| | return num_faces_found, frame |
| | if num_faces_found == 0: |
| | return num_faces_found, frame |
| |
|
| | |
| |
|
| | if self.options.imagemask is not None and self.options.imagemask.shape == frame.shape: |
| | temp_frame = self.simple_blend_with_mask(temp_frame, frame, self.options.imagemask) |
| | return num_faces_found, temp_frame |
| |
|
| |
|
| | def rotation_action(self, original_face:Face, frame:Frame): |
| | (height, width) = frame.shape[:2] |
| |
|
| | bounding_box_width = original_face.bbox[2] - original_face.bbox[0] |
| | bounding_box_height = original_face.bbox[3] - original_face.bbox[1] |
| | horizontal_face = bounding_box_width > bounding_box_height |
| |
|
| | center_x = width // 2.0 |
| | start_x = original_face.bbox[0] |
| | end_x = original_face.bbox[2] |
| | bbox_center_x = start_x + (bounding_box_width // 2.0) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | forehead_x = original_face.landmark_2d_106[72][0] |
| | chin_x = original_face.landmark_2d_106[0][0] |
| |
|
| | if horizontal_face: |
| | if chin_x < forehead_x: |
| | |
| | return "rotate_anticlockwise" |
| | elif forehead_x < chin_x: |
| | |
| | return "rotate_clockwise" |
| | if bbox_center_x >= center_x: |
| | |
| | return "rotate_anticlockwise" |
| | if bbox_center_x < center_x: |
| | |
| | return "rotate_clockwise" |
| |
|
| | return None |
| |
|
| |
|
| | def auto_rotate_frame(self, original_face, frame:Frame): |
| | target_face = original_face |
| | original_frame = frame |
| |
|
| | rotation_action = self.rotation_action(original_face, frame) |
| |
|
| | if rotation_action == "rotate_anticlockwise": |
| | |
| | frame = rotate_anticlockwise(frame) |
| | elif rotation_action == "rotate_clockwise": |
| | |
| | frame = rotate_clockwise(frame) |
| |
|
| | return target_face, frame, rotation_action |
| | |
| |
|
| | def auto_unrotate_frame(self, frame:Frame, rotation_action): |
| | if rotation_action == "rotate_anticlockwise": |
| | return rotate_clockwise(frame) |
| | elif rotation_action == "rotate_clockwise": |
| | return rotate_anticlockwise(frame) |
| | |
| | return frame |
| |
|
| |
|
| |
|
| | def process_face(self,face_index, target_face:Face, frame:Frame): |
| | from roop.face_util import align_crop |
| |
|
| | enhanced_frame = None |
| | if(len(self.input_face_datas) > 0): |
| | inputface = self.input_face_datas[face_index].faces[0] |
| | else: |
| | inputface = None |
| |
|
| | rotation_action = None |
| | if roop.globals.autorotate_faces: |
| | |
| | rotation_action = self.rotation_action(target_face, frame) |
| | if rotation_action is not None: |
| | (startX, startY, endX, endY) = target_face["bbox"].astype("int") |
| | width = endX - startX |
| | height = endY - startY |
| | offs = int(max(width,height) * 0.25) |
| | rotcutframe,startX, startY, endX, endY = self.cutout(frame, startX - offs, startY - offs, endX + offs, endY + offs) |
| | if rotation_action == "rotate_anticlockwise": |
| | rotcutframe = rotate_anticlockwise(rotcutframe) |
| | elif rotation_action == "rotate_clockwise": |
| | rotcutframe = rotate_clockwise(rotcutframe) |
| | |
| | rotface = get_first_face(rotcutframe) |
| | if rotface is None: |
| | rotation_action = None |
| | else: |
| | saved_frame = frame.copy() |
| | frame = rotcutframe |
| | target_face = rotface |
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| | """ Code ported/adapted from Facefusion which borrowed the idea from Rope: |
| | Kind of subsampling the cutout and aligned face image and faceswapping slices of it up to |
| | the desired output resolution. This works around the current resolution limitations without using enhancers. |
| | """ |
| | model_output_size = 128 |
| | subsample_size = self.options.subsample_size |
| | subsample_total = subsample_size // model_output_size |
| | aligned_img, M = align_crop(frame, target_face.kps, subsample_size) |
| |
|
| | fake_frame = aligned_img |
| | target_face.matrix = M |
| |
|
| | for p in self.processors: |
| | if p.type == 'swap': |
| | swap_result_frames = [] |
| | subsample_frames = self.implode_pixel_boost(aligned_img, model_output_size, subsample_total) |
| | for sliced_frame in subsample_frames: |
| | for _ in range(0,self.options.num_swap_steps): |
| | sliced_frame = self.prepare_crop_frame(sliced_frame) |
| | sliced_frame = p.Run(inputface, target_face, sliced_frame) |
| | sliced_frame = self.normalize_swap_frame(sliced_frame) |
| | swap_result_frames.append(sliced_frame) |
| | fake_frame = self.explode_pixel_boost(swap_result_frames, model_output_size, subsample_total, subsample_size) |
| | fake_frame = fake_frame.astype(np.uint8) |
| | scale_factor = 0.0 |
| | elif p.type == 'mask': |
| | fake_frame = self.process_mask(p, aligned_img, fake_frame) |
| | else: |
| | enhanced_frame, scale_factor = p.Run(self.input_face_datas[face_index], target_face, fake_frame) |
| |
|
| | upscale = 512 |
| | orig_width = fake_frame.shape[1] |
| | if orig_width != upscale: |
| | fake_frame = cv2.resize(fake_frame, (upscale, upscale), cv2.INTER_CUBIC) |
| | mask_offsets = (0,0,0,0,1,20) if inputface is None else inputface.mask_offsets |
| |
|
| | |
| | if enhanced_frame is None: |
| | scale_factor = int(upscale / orig_width) |
| | result = self.paste_upscale(fake_frame, fake_frame, target_face.matrix, frame, scale_factor, mask_offsets) |
| | else: |
| | result = self.paste_upscale(fake_frame, enhanced_frame, target_face.matrix, frame, scale_factor, mask_offsets) |
| |
|
| | |
| | if self.options.restore_original_mouth: |
| | mouth_cutout, mouth_bb = self.create_mouth_mask(target_face, frame) |
| | result = self.apply_mouth_area(result, mouth_cutout, mouth_bb) |
| |
|
| | if rotation_action is not None: |
| | fake_frame = self.auto_unrotate_frame(result, rotation_action) |
| | result = self.paste_simple(fake_frame, saved_frame, startX, startY) |
| | |
| | return result |
| |
|
| | |
| |
|
| |
|
| | def cutout(self, frame:Frame, start_x, start_y, end_x, end_y): |
| | if start_x < 0: |
| | start_x = 0 |
| | if start_y < 0: |
| | start_y = 0 |
| | if end_x > frame.shape[1]: |
| | end_x = frame.shape[1] |
| | if end_y > frame.shape[0]: |
| | end_y = frame.shape[0] |
| | return frame[start_y:end_y, start_x:end_x], start_x, start_y, end_x, end_y |
| |
|
| | def paste_simple(self, src:Frame, dest:Frame, start_x, start_y): |
| | end_x = start_x + src.shape[1] |
| | end_y = start_y + src.shape[0] |
| |
|
| | start_x, end_x, start_y, end_y = clamp_cut_values(start_x, end_x, start_y, end_y, dest) |
| | dest[start_y:end_y, start_x:end_x] = src |
| | return dest |
| | |
| | def simple_blend_with_mask(self, image1, image2, mask): |
| | |
| | blended_image = image1.astype(np.float32) * (1.0 - mask) + image2.astype(np.float32) * mask |
| | return blended_image.astype(np.uint8) |
| |
|
| |
|
| | def paste_upscale(self, fake_face, upsk_face, M, target_img, scale_factor, mask_offsets): |
| | M_scale = M * scale_factor |
| | IM = cv2.invertAffineTransform(M_scale) |
| |
|
| | face_matte = np.full((target_img.shape[0],target_img.shape[1]), 255, dtype=np.uint8) |
| | |
| | img_matte = np.zeros((upsk_face.shape[0],upsk_face.shape[1]), dtype=np.uint8) |
| |
|
| | w = img_matte.shape[1] |
| | h = img_matte.shape[0] |
| |
|
| | top = int(mask_offsets[0] * h) |
| | bottom = int(h - (mask_offsets[1] * h)) |
| | left = int(mask_offsets[2] * w) |
| | right = int(w - (mask_offsets[3] * w)) |
| | img_matte[top:bottom,left:right] = 255 |
| |
|
| | |
| | img_matte = cv2.warpAffine(img_matte, IM, (target_img.shape[1], target_img.shape[0]), flags=cv2.INTER_NEAREST, borderValue=0.0) |
| | |
| | img_matte[:1,:] = img_matte[-1:,:] = img_matte[:,:1] = img_matte[:,-1:] = 0 |
| |
|
| | img_matte = self.blur_area(img_matte, mask_offsets[4], mask_offsets[5]) |
| | |
| | img_matte = img_matte.astype(np.float32)/255 |
| | face_matte = face_matte.astype(np.float32)/255 |
| | img_matte = np.minimum(face_matte, img_matte) |
| | if self.options.show_face_area_overlay: |
| | |
| | green_overlay = np.zeros_like(target_img) |
| | green_color = [0, 255, 0] |
| | for i in range(3): |
| | green_overlay[:, :, i] = np.where(img_matte > 0, green_color[i], 0) |
| | img_matte = np.reshape(img_matte, [img_matte.shape[0],img_matte.shape[1],1]) |
| | paste_face = cv2.warpAffine(upsk_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE) |
| | if upsk_face is not fake_face: |
| | fake_face = cv2.warpAffine(fake_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE) |
| | paste_face = cv2.addWeighted(paste_face, self.options.blend_ratio, fake_face, 1.0 - self.options.blend_ratio, 0) |
| |
|
| | |
| | paste_face = img_matte * paste_face |
| | paste_face = paste_face + (1-img_matte) * target_img.astype(np.float32) |
| | if self.options.show_face_area_overlay: |
| | |
| | paste_face = cv2.addWeighted(paste_face.astype(np.uint8), 1 - 0.5, green_overlay, 0.5, 0) |
| | return paste_face.astype(np.uint8) |
| |
|
| |
|
| | def blur_area(self, img_matte, num_erosion_iterations, blur_amount): |
| | |
| | mask_h_inds, mask_w_inds = np.where(img_matte==255) |
| | |
| | mask_h = np.max(mask_h_inds) - np.min(mask_h_inds) |
| | mask_w = np.max(mask_w_inds) - np.min(mask_w_inds) |
| | mask_size = int(np.sqrt(mask_h*mask_w)) |
| | |
| | |
| | k = max(mask_size//(blur_amount // 2) , blur_amount // 2) |
| | kernel = np.ones((k,k),np.uint8) |
| | img_matte = cv2.erode(img_matte,kernel,iterations = num_erosion_iterations) |
| | |
| | |
| | k = max(mask_size//blur_amount, blur_amount//5) |
| | kernel_size = (k, k) |
| | blur_size = tuple(2*i+1 for i in kernel_size) |
| | return cv2.GaussianBlur(img_matte, blur_size, 0) |
| |
|
| |
|
| | def prepare_crop_frame(self, swap_frame): |
| | model_type = 'inswapper' |
| | model_mean = [0.0, 0.0, 0.0] |
| | model_standard_deviation = [1.0, 1.0, 1.0] |
| |
|
| | if model_type == 'ghost': |
| | swap_frame = swap_frame[:, :, ::-1] / 127.5 - 1 |
| | else: |
| | swap_frame = swap_frame[:, :, ::-1] / 255.0 |
| | swap_frame = (swap_frame - model_mean) / model_standard_deviation |
| | swap_frame = swap_frame.transpose(2, 0, 1) |
| | swap_frame = np.expand_dims(swap_frame, axis = 0).astype(np.float32) |
| | return swap_frame |
| |
|
| |
|
| | def normalize_swap_frame(self, swap_frame): |
| | model_type = 'inswapper' |
| | swap_frame = swap_frame.transpose(1, 2, 0) |
| |
|
| | if model_type == 'ghost': |
| | swap_frame = (swap_frame * 127.5 + 127.5).round() |
| | else: |
| | swap_frame = (swap_frame * 255.0).round() |
| | swap_frame = swap_frame[:, :, ::-1] |
| | return swap_frame |
| |
|
| | def implode_pixel_boost(self, aligned_face_frame, model_size, pixel_boost_total : int): |
| | subsample_frame = aligned_face_frame.reshape(model_size, pixel_boost_total, model_size, pixel_boost_total, 3) |
| | subsample_frame = subsample_frame.transpose(1, 3, 0, 2, 4).reshape(pixel_boost_total ** 2, model_size, model_size, 3) |
| | return subsample_frame |
| |
|
| |
|
| | def explode_pixel_boost(self, subsample_frame, model_size, pixel_boost_total, pixel_boost_size): |
| | final_frame = np.stack(subsample_frame, axis = 0).reshape(pixel_boost_total, pixel_boost_total, model_size, model_size, 3) |
| | final_frame = final_frame.transpose(2, 0, 3, 1, 4).reshape(pixel_boost_size, pixel_boost_size, 3) |
| | return final_frame |
| |
|
| | def process_mask(self, processor, frame:Frame, target:Frame): |
| | img_mask = processor.Run(frame, self.options.masking_text) |
| | img_mask = cv2.resize(img_mask, (target.shape[1], target.shape[0])) |
| | img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1]) |
| |
|
| | if self.options.show_face_masking: |
| | result = (1 - img_mask) * frame.astype(np.float32) |
| | return np.uint8(result) |
| |
|
| |
|
| | target = target.astype(np.float32) |
| | result = (1-img_mask) * target |
| | result += img_mask * frame.astype(np.float32) |
| | return np.uint8(result) |
| |
|
| |
|
| | |
| | |
| | def create_mouth_mask(self, face: Face, frame: Frame): |
| | mouth_cutout = None |
| | |
| | landmarks = face.landmark_2d_106 |
| | if landmarks is not None: |
| | |
| | mouth_points = landmarks[52:71].astype(np.int32) |
| | |
| | |
| | min_x, min_y = np.min(mouth_points, axis=0) |
| | max_x, max_y = np.max(mouth_points, axis=0) |
| | min_x = max(0, min_x - (15*6)) |
| | min_y = max(0, min_y - 22) |
| | max_x = min(frame.shape[1], max_x + (15*6)) |
| | max_y = min(frame.shape[0], max_y + (90*6)) |
| | |
| | |
| | mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() |
| |
|
| | return mouth_cutout, (min_x, min_y, max_x, max_y) |
| |
|
| |
|
| |
|
| | def create_feathered_mask(self, shape, feather_amount=30): |
| | mask = np.zeros(shape[:2], dtype=np.float32) |
| | center = (shape[1] // 2, shape[0] // 2) |
| | cv2.ellipse(mask, center, (shape[1] // 2 - feather_amount, shape[0] // 2 - feather_amount), |
| | 0, 0, 360, 1, -1) |
| | mask = cv2.GaussianBlur(mask, (feather_amount*2+1, feather_amount*2+1), 0) |
| | return mask / np.max(mask) |
| |
|
| | def apply_mouth_area(self, frame: np.ndarray, mouth_cutout: np.ndarray, mouth_box: tuple) -> np.ndarray: |
| | min_x, min_y, max_x, max_y = mouth_box |
| | box_width = max_x - min_x |
| | box_height = max_y - min_y |
| | |
| |
|
| | |
| | if mouth_cutout is None or box_width is None or box_height is None: |
| | return frame |
| | try: |
| | resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height)) |
| | |
| | |
| | roi = frame[min_y:max_y, min_x:max_x] |
| | |
| | |
| | if roi.shape != resized_mouth_cutout.shape: |
| | resized_mouth_cutout = cv2.resize(resized_mouth_cutout, (roi.shape[1], roi.shape[0])) |
| | |
| | |
| | color_corrected_mouth = self.apply_color_transfer(resized_mouth_cutout, roi) |
| | |
| | |
| | feather_amount = min(30, box_width // 15, box_height // 15) |
| | mask = self.create_feathered_mask(resized_mouth_cutout.shape, feather_amount) |
| | |
| | |
| | mask = mask[:,:,np.newaxis] |
| | blended = (color_corrected_mouth * mask + roi * (1 - mask)).astype(np.uint8) |
| | |
| | |
| | frame[min_y:max_y, min_x:max_x] = blended |
| | except Exception as e: |
| | print(f'Error {e}') |
| | pass |
| |
|
| | return frame |
| |
|
| | def apply_color_transfer(self, source, target): |
| | """ |
| | Apply color transfer from target to source image |
| | """ |
| | source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") |
| | target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") |
| |
|
| | source_mean, source_std = cv2.meanStdDev(source) |
| | target_mean, target_std = cv2.meanStdDev(target) |
| |
|
| | |
| | source_mean = source_mean.reshape(1, 1, 3) |
| | source_std = source_std.reshape(1, 1, 3) |
| | target_mean = target_mean.reshape(1, 1, 3) |
| | target_std = target_std.reshape(1, 1, 3) |
| |
|
| | |
| | source = (source - source_mean) * (target_std / source_std) + target_mean |
| | return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) |
| |
|
| |
|
| |
|
| | def unload_models(): |
| | pass |
| |
|
| |
|
| | def release_resources(self): |
| | for p in self.processors: |
| | p.Release() |
| | self.processors.clear() |
| | if self.videowriter is not None: |
| | self.videowriter.close() |
| | if self.streamwriter is not None: |
| | self.streamwriter.Close() |
| |
|
| |
|