import os import cv2 import numpy as np import psutil from enum import Enum from roop.ProcessOptions import ProcessOptions from roop.face_util import get_first_face, get_all_faces, rotate_image_180, rotate_anticlockwise, rotate_clockwise, clamp_cut_values from roop.utilities import compute_cosine_distance, get_device, str_to_class import roop.vr_util as vr from typing import Any, List, Callable from roop.typing import Frame, Face from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Thread, Lock from queue import Queue from tqdm import tqdm from roop.ffmpeg_writer import FFMPEG_VideoWriter import roop.globals # Poor man's enum to be able to compare to int class eNoFaceAction(): USE_ORIGINAL_FRAME = 0 RETRY_ROTATED = 1 SKIP_FRAME = 2 SKIP_FRAME_IF_DISSIMILAR = 3 def create_queue(temp_frame_paths: List[str]) -> Queue[str]: queue: Queue[str] = Queue() for frame_path in temp_frame_paths: queue.put(frame_path) return queue def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]: queues = [] for _ in range(queue_per_future): if not queue.empty(): queues.append(queue.get()) return queues class ProcessMgr(): input_face_datas = [] target_face_datas = [] imagemask = None processors = [] options : ProcessOptions = None num_threads = 1 current_index = 0 processing_threads = 1 buffer_wait_time = 0.1 lock = Lock() frames_queue = None processed_queue = None videowriter= None progress_gradio = None total_frames = 0 plugins = { 'faceswap' : 'FaceSwapInsightFace', 'mask_clip2seg' : 'Mask_Clip2Seg', 'mask_xseg' : 'Mask_XSeg', 'codeformer' : 'Enhance_CodeFormer', 'gfpgan' : 'Enhance_GFPGAN', 'dmdnet' : 'Enhance_DMDNet', 'gpen' : 'Enhance_GPEN', 'restoreformer++' : 'Enhance_RestoreFormerPPlus', 'colorizer' : 'Frame_Colorizer', 'filter_generic' : 'Frame_Filter', 'removebg' : 'Frame_Masking', 'upscale' : 'Frame_Upscale' } def __init__(self, progress): if progress is not None: self.progress_gradio = progress def reuseOldProcessor(self, name:str): for p in self.processors: if p.processorname == name: return p return None def initialize(self, input_faces, target_faces, options): self.input_face_datas = input_faces self.target_face_datas = target_faces self.options = options devicename = get_device() roop.globals.g_desired_face_analysis=["landmark_3d_68", "landmark_2d_106","detection","recognition"] if options.swap_mode == "all_female" or options.swap_mode == "all_male": roop.globals.g_desired_face_analysis.append("genderage") for p in self.processors: newp = next((x for x in options.processors.keys() if x == p.processorname), None) if newp is None: p.Release() del p newprocessors = [] for key, extoption in options.processors.items(): p = self.reuseOldProcessor(key) if p is None: classname = self.plugins[key] module = 'roop.processors.' + classname p = str_to_class(module, classname) if p is not None: extoption.update({"devicename": devicename}) p.Initialize(extoption) newprocessors.append(p) else: print(f"Not using {module}") self.processors = newprocessors if isinstance(self.options.imagemask, dict) and self.options.imagemask.get("layers") and len(self.options.imagemask["layers"]) > 0: self.options.imagemask = self.options.imagemask.get("layers")[0] # Get rid of alpha self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_RGBA2GRAY) if np.any(self.options.imagemask): mo = self.input_face_datas[0].faces[0].mask_offsets self.options.imagemask = self.blur_area(self.options.imagemask, mo[4], mo[5]) self.options.imagemask = self.options.imagemask.astype(np.float32) / 255 self.options.imagemask = cv2.cvtColor(self.options.imagemask, cv2.COLOR_GRAY2RGB) else: self.options.imagemask = None self.options.frame_processing = False for p in self.processors: if p.type.startswith("frame_"): self.options.frame_processing = True def run_batch(self, source_files, target_files, threads:int = 1): progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' self.total_frames = len(source_files) self.num_threads = threads with tqdm(total=self.total_frames, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with ThreadPoolExecutor(max_workers=threads) as executor: futures = [] queue = create_queue(source_files) queue_per_future = max(len(source_files) // threads, 1) while not queue.empty(): future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress)) futures.append(future) for future in as_completed(futures): future.result() def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None: for f in current_files: if not roop.globals.processing: return # Decode the byte array into an OpenCV image temp_frame = cv2.imdecode(np.fromfile(f, dtype=np.uint8), cv2.IMREAD_COLOR) if temp_frame is not None: if self.options.frame_processing: for p in self.processors: frame = p.Run(temp_frame) resimg = frame else: resimg = self.process_frame(temp_frame) if resimg is not None: i = source_files.index(f) cv2.imwrite(target_files[i], resimg) if update: update() def read_frames_thread(self, cap, frame_start, frame_end, num_threads): num_frame = 0 total_num = frame_end - frame_start if frame_start > 0: cap.set(cv2.CAP_PROP_POS_FRAMES,frame_start) while True and roop.globals.processing: ret, frame = cap.read() if not ret: break self.frames_queue[num_frame % num_threads].put(frame, block=True) num_frame += 1 if num_frame == total_num: break for i in range(num_threads): self.frames_queue[i].put(None) def process_videoframes(self, threadindex, progress) -> None: while True: frame = self.frames_queue[threadindex].get() if frame is None: self.processing_threads -= 1 self.processed_queue[threadindex].put((False, None)) return else: if self.options.frame_processing: for p in self.processors: frame = p.Run(frame) resimg = frame else: resimg = self.process_frame(frame) self.processed_queue[threadindex].put((True, resimg)) del frame progress() def write_frames_thread(self): nextindex = 0 num_producers = self.num_threads while True: process, frame = self.processed_queue[nextindex % self.num_threads].get() nextindex += 1 if frame is not None: self.videowriter.write_frame(frame) del frame elif process == False: num_producers -= 1 if num_producers < 1: return def run_batch_inmem(self, source_video, target_video, frame_start, frame_end, fps, threads:int = 1, skip_audio=False): cap = cv2.VideoCapture(source_video) # frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_count = (frame_end - frame_start) + 1 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) processed_resolution = None for p in self.processors: if hasattr(p, 'getProcessedResolution'): processed_resolution = p.getProcessedResolution(width, height) print(f"Processed resolution: {processed_resolution}") if processed_resolution is not None: width = processed_resolution[0] height = processed_resolution[1] self.total_frames = frame_count self.num_threads = threads self.processing_threads = self.num_threads self.frames_queue = [] self.processed_queue = [] for _ in range(threads): self.frames_queue.append(Queue(1)) self.processed_queue.append(Queue(1)) self.videowriter = FFMPEG_VideoWriter(target_video, (width, height), fps, codec=roop.globals.video_encoder, crf=roop.globals.video_quality, audiofile=None) readthread = Thread(target=self.read_frames_thread, args=(cap, frame_start, frame_end, threads)) readthread.start() writethread = Thread(target=self.write_frames_thread) writethread.start() progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' with tqdm(total=self.total_frames, desc='Processing', unit='frames', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with ThreadPoolExecutor(thread_name_prefix='swap_proc', max_workers=self.num_threads) as executor: futures = [] for threadindex in range(threads): future = executor.submit(self.process_videoframes, threadindex, lambda: self.update_progress(progress)) futures.append(future) for future in as_completed(futures): future.result() # wait for the task to complete readthread.join() writethread.join() cap.release() self.videowriter.close() self.frames_queue.clear() self.processed_queue.clear() def update_progress(self, progress: Any = None) -> None: process = psutil.Process(os.getpid()) memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 progress.set_postfix({ 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', 'execution_threads': self.num_threads }) progress.update(1) if self.progress_gradio is not None: self.progress_gradio((progress.n, self.total_frames), desc='Processing', total=self.total_frames, unit='frames') # https://github.com/deepinsight/insightface#third-party-re-implementation-of-arcface # https://github.com/deepinsight/insightface/blob/master/alignment/coordinate_reg/image_infer.py # https://github.com/deepinsight/insightface/issues/1350 # https://github.com/linghu8812/tensorrt_inference def process_frame(self, frame:Frame): if len(self.input_face_datas) < 1 and not self.options.show_face_masking: return frame temp_frame = frame.copy() num_swapped, temp_frame = self.swap_faces(frame, temp_frame) if num_swapped > 0: if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME_IF_DISSIMILAR: if len(self.input_face_datas) > num_swapped: return None return temp_frame if roop.globals.no_face_action == eNoFaceAction.USE_ORIGINAL_FRAME: return frame if roop.globals.no_face_action == eNoFaceAction.SKIP_FRAME: #This only works with in-mem processing, as it simply skips the frame. #For 'extract frames' it simply leaves the unprocessed frame unprocessed and it gets used in the final output by ffmpeg. #If we could delete that frame here, that'd work but that might cause ffmpeg to fail unless the frames are renamed, and I don't think we have the info on what frame it actually is????? #alternatively, it could mark all the necessary frames for deletion, delete them at the end, then rename the remaining frames that might work? return None else: return self.retry_rotated(frame) def retry_rotated(self, frame): copyframe = frame.copy() copyframe = rotate_clockwise(copyframe) temp_frame = copyframe.copy() num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame) if num_swapped > 0: return rotate_anticlockwise(temp_frame) copyframe = frame.copy() copyframe = rotate_anticlockwise(copyframe) temp_frame = copyframe.copy() num_swapped, temp_frame = self.swap_faces(copyframe, temp_frame) if num_swapped > 0: return rotate_clockwise(temp_frame) del copyframe return frame def swap_faces(self, frame, temp_frame): num_faces_found = 0 if self.options.swap_mode == "first": face = get_first_face(frame) if face is None: return num_faces_found, frame num_faces_found += 1 temp_frame = self.process_face(self.options.selected_index, face, temp_frame) else: faces = get_all_faces(frame) if faces is None: return num_faces_found, frame if self.options.swap_mode == "all": for face in faces: num_faces_found += 1 temp_frame = self.process_face(self.options.selected_index, face, temp_frame) del face elif self.options.swap_mode == "selected": num_targetfaces = len(self.target_face_datas) use_index = num_targetfaces == 1 for i,tf in enumerate(self.target_face_datas): for face in faces: if compute_cosine_distance(tf.embedding, face.embedding) <= self.options.face_distance_threshold: if i < len(self.input_face_datas): if use_index: temp_frame = self.process_face(self.options.selected_index, face, temp_frame) else: temp_frame = self.process_face(i, face, temp_frame) num_faces_found += 1 del face if not roop.globals.vr_mode and num_faces_found == num_targetfaces: break elif self.options.swap_mode == "all_female" or self.options.swap_mode == "all_male": gender = 'F' if self.options.swap_mode == "all_female" else 'M' for face in faces: if face.sex == gender: num_faces_found += 1 temp_frame = self.process_face(self.options.selected_index, face, temp_frame) del face if roop.globals.vr_mode and num_faces_found % 2 > 0: # stereo image, there has to be an even number of faces num_faces_found = 0 return num_faces_found, frame if num_faces_found == 0: return num_faces_found, frame #maskprocessor = next((x for x in self.processors if x.type == 'mask'), None) if self.options.imagemask is not None and self.options.imagemask.shape == frame.shape: temp_frame = self.simple_blend_with_mask(temp_frame, frame, self.options.imagemask) return num_faces_found, temp_frame def rotation_action(self, original_face:Face, frame:Frame): (height, width) = frame.shape[:2] bounding_box_width = original_face.bbox[2] - original_face.bbox[0] bounding_box_height = original_face.bbox[3] - original_face.bbox[1] horizontal_face = bounding_box_width > bounding_box_height center_x = width // 2.0 start_x = original_face.bbox[0] end_x = original_face.bbox[2] bbox_center_x = start_x + (bounding_box_width // 2.0) # need to leverage the array of landmarks as decribed here: # https://github.com/deepinsight/insightface/tree/master/alignment/coordinate_reg # basically, we should be able to check for the relative position of eyes and nose # then use that to determine which way the face is actually facing when in a horizontal position # and use that to determine the correct rotation_action forehead_x = original_face.landmark_2d_106[72][0] chin_x = original_face.landmark_2d_106[0][0] if horizontal_face: if chin_x < forehead_x: # this is someone lying down with their face like this (: return "rotate_anticlockwise" elif forehead_x < chin_x: # this is someone lying down with their face like this :) return "rotate_clockwise" if bbox_center_x >= center_x: # this is someone lying down with their face in the right hand side of the frame return "rotate_anticlockwise" if bbox_center_x < center_x: # this is someone lying down with their face in the left hand side of the frame return "rotate_clockwise" return None def auto_rotate_frame(self, original_face, frame:Frame): target_face = original_face original_frame = frame rotation_action = self.rotation_action(original_face, frame) if rotation_action == "rotate_anticlockwise": #face is horizontal, rotating frame anti-clockwise and getting face bounding box from rotated frame frame = rotate_anticlockwise(frame) elif rotation_action == "rotate_clockwise": #face is horizontal, rotating frame clockwise and getting face bounding box from rotated frame frame = rotate_clockwise(frame) return target_face, frame, rotation_action def auto_unrotate_frame(self, frame:Frame, rotation_action): if rotation_action == "rotate_anticlockwise": return rotate_clockwise(frame) elif rotation_action == "rotate_clockwise": return rotate_anticlockwise(frame) return frame def process_face(self,face_index, target_face:Face, frame:Frame): from roop.face_util import align_crop enhanced_frame = None if(len(self.input_face_datas) > 0): inputface = self.input_face_datas[face_index].faces[0] else: inputface = None rotation_action = None if roop.globals.autorotate_faces: # check for sideways rotation of face rotation_action = self.rotation_action(target_face, frame) if rotation_action is not None: (startX, startY, endX, endY) = target_face["bbox"].astype("int") width = endX - startX height = endY - startY offs = int(max(width,height) * 0.25) rotcutframe,startX, startY, endX, endY = self.cutout(frame, startX - offs, startY - offs, endX + offs, endY + offs) if rotation_action == "rotate_anticlockwise": rotcutframe = rotate_anticlockwise(rotcutframe) elif rotation_action == "rotate_clockwise": rotcutframe = rotate_clockwise(rotcutframe) # rotate image and re-detect face to correct wonky landmarks rotface = get_first_face(rotcutframe) if rotface is None: rotation_action = None else: saved_frame = frame.copy() frame = rotcutframe target_face = rotface # if roop.globals.vr_mode: # bbox = target_face.bbox # [orig_width, orig_height, _] = frame.shape # # Convert bounding box to ints # x1, y1, x2, y2 = map(int, bbox) # # Determine the center of the bounding box # x_center = (x1 + x2) / 2 # y_center = (y1 + y2) / 2 # # Normalize coordinates to range [-1, 1] # x_center_normalized = x_center / (orig_width / 2) - 1 # y_center_normalized = y_center / (orig_width / 2) - 1 # # Convert normalized coordinates to spherical (theta, phi) # theta = x_center_normalized * 180 # Theta ranges from -180 to 180 degrees # phi = -y_center_normalized * 90 # Phi ranges from -90 to 90 degrees # img = vr.GetPerspective(frame, 90, theta, phi, 1280, 1280) # Generate perspective image fake_frame = None aligned_img, M = align_crop(frame, target_face.kps, 128) fake_frame = aligned_img swap_frame = aligned_img target_face.matrix = M for p in self.processors: if p.type == 'swap': if inputface is not None: for _ in range(0,self.options.num_swap_steps): swap_frame = p.Run(inputface, target_face, swap_frame) fake_frame = swap_frame scale_factor = 0.0 elif p.type == 'mask': fake_frame = self.process_mask(p, aligned_img, fake_frame) else: enhanced_frame, scale_factor = p.Run(self.input_face_datas[face_index], target_face, fake_frame) upscale = 512 orig_width = fake_frame.shape[1] fake_frame = cv2.resize(fake_frame, (upscale, upscale), cv2.INTER_CUBIC) mask_offsets = (0,0,0,0,1,20) if inputface is None else inputface.mask_offsets if enhanced_frame is None: scale_factor = int(upscale / orig_width) result = self.paste_upscale(fake_frame, fake_frame, target_face.matrix, frame, scale_factor, mask_offsets) else: result = self.paste_upscale(fake_frame, enhanced_frame, target_face.matrix, frame, scale_factor, mask_offsets) if rotation_action is not None: fake_frame = self.auto_unrotate_frame(result, rotation_action) return self.paste_simple(fake_frame, saved_frame, startX, startY) return result def cutout(self, frame:Frame, start_x, start_y, end_x, end_y): if start_x < 0: start_x = 0 if start_y < 0: start_y = 0 if end_x > frame.shape[1]: end_x = frame.shape[1] if end_y > frame.shape[0]: end_y = frame.shape[0] return frame[start_y:end_y, start_x:end_x], start_x, start_y, end_x, end_y def paste_simple(self, src:Frame, dest:Frame, start_x, start_y): end_x = start_x + src.shape[1] end_y = start_y + src.shape[0] start_x, end_x, start_y, end_y = clamp_cut_values(start_x, end_x, start_y, end_y, dest) dest[start_y:end_y, start_x:end_x] = src return dest def simple_blend_with_mask(self, image1, image2, mask): # Blend the images blended_image = image1.astype(np.float32) * (1.0 - mask) + image2.astype(np.float32) * mask return blended_image.astype(np.uint8) def paste_upscale(self, fake_face, upsk_face, M, target_img, scale_factor, mask_offsets): M_scale = M * scale_factor IM = cv2.invertAffineTransform(M_scale) face_matte = np.full((target_img.shape[0],target_img.shape[1]), 255, dtype=np.uint8) # Generate white square sized as a upsk_face img_matte = np.zeros((upsk_face.shape[0],upsk_face.shape[1]), dtype=np.uint8) w = img_matte.shape[1] h = img_matte.shape[0] top = int(mask_offsets[0] * h) bottom = int(h - (mask_offsets[1] * h)) left = int(mask_offsets[2] * w) right = int(w - (mask_offsets[3] * w)) img_matte[top:bottom,left:right] = 255 # Transform white square back to target_img img_matte = cv2.warpAffine(img_matte, IM, (target_img.shape[1], target_img.shape[0]), flags=cv2.INTER_NEAREST, borderValue=0.0) ##Blacken the edges of face_matte by 1 pixels (so the mask in not expanded on the image edges) img_matte[:1,:] = img_matte[-1:,:] = img_matte[:,:1] = img_matte[:,-1:] = 0 img_matte = self.blur_area(img_matte, mask_offsets[4], mask_offsets[5]) #Normalize images to float values and reshape img_matte = img_matte.astype(np.float32)/255 face_matte = face_matte.astype(np.float32)/255 img_matte = np.minimum(face_matte, img_matte) if self.options.show_face_area_overlay: # Additional steps for green overlay green_overlay = np.zeros_like(target_img) green_color = [0, 255, 0] # RGB for green for i in range(3): # Apply green color where img_matte is not zero green_overlay[:, :, i] = np.where(img_matte > 0, green_color[i], 0) ##Transform upcaled face back to target_img img_matte = np.reshape(img_matte, [img_matte.shape[0],img_matte.shape[1],1]) paste_face = cv2.warpAffine(upsk_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE) if upsk_face is not fake_face: fake_face = cv2.warpAffine(fake_face, IM, (target_img.shape[1], target_img.shape[0]), borderMode=cv2.BORDER_REPLICATE) paste_face = cv2.addWeighted(paste_face, self.options.blend_ratio, fake_face, 1.0 - self.options.blend_ratio, 0) # Re-assemble image paste_face = img_matte * paste_face paste_face = paste_face + (1-img_matte) * target_img.astype(np.float32) if self.options.show_face_area_overlay: # Overlay the green overlay on the final image paste_face = cv2.addWeighted(paste_face.astype(np.uint8), 1 - 0.5, green_overlay, 0.5, 0) return paste_face.astype(np.uint8) def blur_area(self, img_matte, num_erosion_iterations, blur_amount): # Detect the affine transformed white area mask_h_inds, mask_w_inds = np.where(img_matte==255) # Calculate the size (and diagonal size) of transformed white area width and height boundaries mask_h = np.max(mask_h_inds) - np.min(mask_h_inds) mask_w = np.max(mask_w_inds) - np.min(mask_w_inds) mask_size = int(np.sqrt(mask_h*mask_w)) # Calculate the kernel size for eroding img_matte by kernel (insightface empirical guess for best size was max(mask_size//10,10)) # k = max(mask_size//12, 8) k = max(mask_size//(blur_amount // 2) , blur_amount // 2) kernel = np.ones((k,k),np.uint8) img_matte = cv2.erode(img_matte,kernel,iterations = num_erosion_iterations) #Calculate the kernel size for blurring img_matte by blur_size (insightface empirical guess for best size was max(mask_size//20, 5)) # k = max(mask_size//24, 4) k = max(mask_size//blur_amount, blur_amount//5) kernel_size = (k, k) blur_size = tuple(2*i+1 for i in kernel_size) return cv2.GaussianBlur(img_matte, blur_size, 0) def process_mask(self, processor, frame:Frame, target:Frame): img_mask = processor.Run(frame, self.options.masking_text) img_mask = cv2.resize(img_mask, (target.shape[1], target.shape[0])) img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1]) if self.options.show_face_masking: result = (1 - img_mask) * frame.astype(np.float32) return np.uint8(result) target = target.astype(np.float32) result = (1-img_mask) * target result += img_mask * frame.astype(np.float32) return np.uint8(result) def unload_models(): pass def release_resources(self): for p in self.processors: p.Release() self.processors.clear()