Spaces:
Runtime error
Runtime error
| import gc | |
| import os | |
| from pathlib import Path | |
| import cv2 | |
| from PIL import Image | |
| from tqdm import tqdm | |
| from .preprocess_dir.utils import crop_with_fan as cwf | |
| from .preprocess_dir.utils import face_finder as ff | |
| from .util import * | |
| # template video ์ ์ฒ๋ฆฌ | |
| # preprocess_template_old(๊ธฐ์กดํจ์) ์ ๊ธฐ๋ฅ์ ๋์ผํ๊ณ , ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋ ์ค์ | |
| def preprocess_template( | |
| config_path, | |
| template_video_path, | |
| reference_face, | |
| work_root_path, | |
| device, | |
| template_frame_ratio=1.0, | |
| template_video_ratio=[1.0], | |
| callback=None, | |
| verbose=False, | |
| save_frames=True, | |
| silent_video_path=None, | |
| no_infer_frames=[], | |
| ): | |
| """template video ์ ์ฒ๋ฆฌ | |
| Parameters | |
| ---------- | |
| config_path (str) : ์ค์ ํ์ผ ๊ฒฝ๋ก | |
| template_video_path (str) : ํ ํ๋ฆฟ ์์ ๊ฒฝ๋ก | |
| reference_face : (str) : ์ฐธ๊ณ ํ ์ผ๊ตด ์ด๋ฏธ์ง ๊ฒฝ๋ก | |
| work_root_path (str) : ์์ ํด๋ ๊ฒฝ๋ก. ์ ์ฒ๋ฆฌ ์ ๋ณด๊ฐ ์ ์ฅ๋จ. | |
| device (str) : device ์ ๋ณด. ex) cuda:0 | |
| template_frame_ratio (float) : ํ ํ๋ฆฟ ๋น๋์ค resize ๋น์จ. 1.0: ์์ ๊ทธ๋๋ก ์ฌ์ฉ | |
| template_video_ratio (list[float]) : ํ ํ๋ฆฟ ๋น๋์ค resize ๋น์จ. 1.0: ์์ ๊ทธ๋๋ก ์ฌ์ฉ | |
| save_frames (bool) : ํ ํ๋ฆฟ ๋น๋์ค ํ๋ ์ ์ ์ฅ์ฌ๋ถ | |
| no_infer_frames (list[tuple[int,int]]) : ์ถ๋ก ์ ์ฌ์ฉ๋์ง ์๋frame ๊ตฌ๊ฐ. ์์์ ํฌํจ, ๋์ ํฌํจ๋์ง ์์. | |
| """ | |
| load_gpu = False | |
| config = read_config(config_path) | |
| image_size = config.img_size | |
| callback1 = callback_inter( | |
| callback, min_per=0, max_per=2, desc="preprocess_template 1", verbose=verbose | |
| ) | |
| callback2 = callback_inter( | |
| callback, min_per=2, max_per=20, desc="preprocess_template 2", verbose=verbose | |
| ) | |
| callback3 = callback_inter( | |
| callback, min_per=20, max_per=100, desc="preprocess_template 3", verbose=verbose | |
| ) | |
| preprocess_dir = get_preprocess_dir(work_root_path, config.name) | |
| Path(preprocess_dir).mkdir(exist_ok=True, parents=True) | |
| # snow : for debug | |
| if verbose: | |
| print("preprocess_dir: ", preprocess_dir, ", work_root_path:", work_root_path) | |
| # ์ ์ฒ๋ฆฌ ํ์ผ ๊ฒฝ๋ก | |
| crop_mp4 = get_crop_mp4_dir(preprocess_dir, template_video_path) | |
| if not Path(crop_mp4).exists(): | |
| load_gpu = True | |
| ff.init_face_finder(device) | |
| cwf.init_fan(device) | |
| if verbose: | |
| print("ํ ํ๋ฆฟ ๋น๋์ค ์ฒ๋ฆฌ ... ") | |
| # ์๋์ด์ ์ผ๊ตด ์ ๋ณด๋ฅผ ๊ตฌํ๋ค. | |
| df_face, imgs = ff.find_face(reference_face) | |
| callback1(100) # ์งํ์จ์ ์๋ ค์ค๋ค. | |
| g_anchor_ebd = df_face["ebd"].values[0] | |
| # ํ ํ๋ฆฟ ๋์์์์ ์๋์ด์ ์ผ๊ตด ์์น๋ง ์ ์ฅํด ๋๋๋ค | |
| df_paths = ff.save_face_info3( | |
| template_video_path, | |
| g_anchor_ebd, | |
| config.move, | |
| base=preprocess_dir, | |
| callback=callback2, | |
| verbose=verbose, | |
| ) | |
| ### ์ผ๊ตด ์์ญ์ FAN ๋๋๋งํฌ ๊ธฐ๋ฐ์ผ๋ก ํฌ๋กญํด ๋๋๋ค | |
| assert len(df_paths) == 1 | |
| if config.move: | |
| if verbose: | |
| print("cwf.save_crop_info_move --") | |
| df_fan_path = cwf.save_crop_info_move( | |
| image_size=image_size, | |
| anchor_box_path=df_paths[0], | |
| mp4_path=template_video_path, | |
| out_dir=crop_mp4, | |
| crop_offset_y=config.crop_offset_y, | |
| crop_margin=config.crop_margin, | |
| callback=callback3, | |
| verbose=verbose, | |
| ) | |
| else: | |
| if verbose: | |
| print("cwf.save_crop_info2 --") | |
| df_fan_path = cwf.save_crop_info2( | |
| image_size=image_size, | |
| anchor_box_path=df_paths[0], | |
| mp4_path=template_video_path, | |
| out_dir=crop_mp4, | |
| crop_offset_y=config.crop_offset_y, | |
| crop_margin=config.crop_margin, | |
| no_infer_frames=no_infer_frames, | |
| callback=callback3, | |
| verbose=verbose, | |
| ) | |
| # snow : for debug | |
| if verbose: | |
| print("df_fan_path: ", df_fan_path) | |
| ff.del_face_finder() | |
| cwf.del_fan() | |
| else: | |
| if verbose: | |
| print("์ ์ฒ๋ฆฌ๊ฐ ์ด๋ฏธ ๋์ด์์") | |
| callback3(100) | |
| # 1. save frames for stf | |
| if save_frames: | |
| frame_dir = get_frame_dir( | |
| preprocess_dir, template_video_path, ratio=template_frame_ratio | |
| ) | |
| if verbose: | |
| print("frame_dir:", frame_dir) | |
| save_template_frames( | |
| template_video_path=template_video_path, | |
| template_frames_path=frame_dir, | |
| ratio=template_frame_ratio, | |
| save_in_video=False, | |
| verbose=verbose, | |
| ) | |
| if silent_video_path is not None: | |
| frame_dir = get_frame_dir( | |
| preprocess_dir, silent_video_path, ratio=template_frame_ratio | |
| ) | |
| save_template_frames( | |
| template_video_path=silent_video_path, | |
| template_frames_path=frame_dir, | |
| ratio=template_frame_ratio, | |
| save_in_video=False, | |
| verbose=verbose, | |
| ) | |
| if template_video_path.endswith(".mov"): | |
| # TODO snow : ์ฑ๋ฅ ํ์ธ ํ์. | |
| # ์ง๊ธ์ mov ์ธ ๊ฒฝ์ฐ๋ง ํ์ผ์ ์ ์ฅํ๋ค. ์ถ๋ก ํ ๋ ๋๋ ค์ ๋ผ๊ณ ํ๋ค. by hojin | |
| # 2. save video for encoding | |
| for video_ratio in template_video_ratio: | |
| if video_ratio != 1.0: | |
| out_path = get_template_ratio_file_path( | |
| preprocess_dir, template_video_path, ratio=video_ratio | |
| ) | |
| save_template_frames( | |
| template_video_path=template_video_path, | |
| template_frames_path="", | |
| template_video_path_with_ratio=out_path, | |
| ratio=video_ratio, | |
| save_in_video=True, | |
| verbose=verbose, | |
| ) | |
| if silent_video_path is not None: | |
| out_path = get_template_ratio_file_path( | |
| preprocess_dir, silent_video_path, ratio=video_ratio | |
| ) | |
| save_template_webm_ratio( | |
| template_video_path=silent_video_path, | |
| ratio=video_ratio, | |
| out_path=out_path, | |
| verbose=verbose, | |
| ) | |
| if template_video_path.endswith(".webm"): | |
| # TODO snow : ์ฑ๋ฅ ํ์ธ ํ์. ratio ๊ฐ์๋งํผ webm ์ ๋ง๋ ๋ค. | |
| for video_ratio in template_video_ratio: | |
| out_path = get_template_ratio_file_path( | |
| preprocess_dir, template_video_path, ratio=video_ratio | |
| ) | |
| save_template_webm_ratio( | |
| template_video_path=template_video_path, | |
| ratio=video_ratio, | |
| out_path=out_path, | |
| verbose=verbose, | |
| ) | |
| if silent_video_path is not None: | |
| out_path = get_template_ratio_file_path( | |
| preprocess_dir, silent_video_path, ratio=video_ratio | |
| ) | |
| save_template_webm_ratio( | |
| template_video_path=silent_video_path, | |
| ratio=video_ratio, | |
| out_path=out_path, | |
| verbose=verbose, | |
| ) | |
| gc.collect() | |
| return load_gpu | |
| # snow: webm ํ ํ๋ฆฟ์ ratio ๋ณ๋ก resize ํ์ฌ ์ ์ฅํ๋ ํจ์ | |
| def save_template_webm_ratio(template_video_path, ratio, out_path, verbose): | |
| def resize_(size, img): | |
| w, h = size | |
| img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) | |
| return img | |
| def inter_alg_(w, h, img): | |
| if w * h < img.shape[0] * img.shape[1]: | |
| return cv2.INTER_AREA | |
| else: | |
| return cv2.INTER_CUBIC | |
| os.makedirs(os.path.dirname(out_path), exist_ok=True) | |
| reader, meta = get_four_channel_ffmpeg_reader(template_video_path) | |
| if Path(out_path).exists(): | |
| if verbose: | |
| print(f"ratio ํ์ผ์ด ์ ์ฅ๋์ด ์์, {out_path}") | |
| return | |
| if verbose: | |
| print(f"webm ratio template, org:{template_video_path}, ratio:{ratio}") | |
| size_org = meta["size"] | |
| size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) | |
| writer = get_webm_ffmpeg_writer( | |
| out_path, size=size, fps=meta["fps"], wav_path=template_video_path | |
| ) | |
| writer.send(None) # seed the generator | |
| total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) | |
| for idx, f in tqdm( | |
| enumerate(reader), total=total_cnt, desc=f"save webm ratio:{ratio}, size:{size}" | |
| ): | |
| f = np.frombuffer(f, dtype=np.uint8) | |
| f = f.reshape(size_org[1], size_org[0], 4) | |
| f = resize_(size, f) | |
| writer.send(f) # seed the generator | |
| writer.close() | |
| # hojin | |
| # png frame ์ถ์ถ + crop | |
| def save_template_frames( | |
| template_video_path, | |
| template_frames_path, | |
| template_video_path_with_ratio=None, | |
| ratio=1.0, | |
| save_in_video=False, | |
| verbose=False, | |
| ): | |
| def inter_alg_(w, h, img): | |
| if w * h < img.shape[0] * img.shape[1]: | |
| return cv2.INTER_AREA | |
| else: | |
| return cv2.INTER_CUBIC | |
| def resize_(size, img): | |
| w, h = size | |
| img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) | |
| return img | |
| # hojin: ํ ํ๋ฆฟ์ ํ๋ ์๋ณ๋ก ์ ์ฅํด๋๊ธฐ -> write_video_in_thread์์ reader ์ฌ์ฉํ์ง ์๊ธฐ ์ํจ | |
| if save_in_video == False: | |
| if Path(template_frames_path).exists(): | |
| if verbose: | |
| print("ํ๋ ์์ด ๋ชจ๋ ์ ์ฅ๋์ด ์์") | |
| return | |
| else: | |
| if Path(template_video_path_with_ratio).exists(): | |
| if verbose: | |
| print("๋น๋์ค๊ฐ ์์ฑ๋์ด ์์") | |
| return | |
| os.makedirs(os.path.dirname(template_video_path_with_ratio), exist_ok=True) | |
| if template_video_path.endswith(".mov") or template_video_path.endswith(".webm"): | |
| reader, meta = get_four_channel_ffmpeg_reader(template_video_path) | |
| else: # mp4 | |
| reader, meta = get_three_channel_ffmpeg_reader(template_video_path) | |
| size_org = meta["size"] | |
| size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) | |
| fps = meta["fps"] | |
| if verbose: | |
| print(meta) | |
| total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) | |
| if save_in_video is False: | |
| Path(template_frames_path).mkdir(exist_ok=True, parents=True) | |
| # hojin: ์ถ์ถํ ํ๋ ์์ ๋ด๋ณด๋ด๊ธฐ๋ฅผ ์ํด์ ๋ค์ mov๋ก ๋ง๋ค์ด๋๊ธฐ (ratio<1.0) | |
| writer = None | |
| if verbose: | |
| print("template_frames_path: ", template_frames_path) | |
| for idx, f in tqdm( | |
| enumerate(reader), | |
| total=total_cnt, | |
| desc=f"save frames f{ratio}" | |
| if save_in_video is False | |
| else f"save video f{ratio}", | |
| ): | |
| name = f"""{idx:05d}.webp""" | |
| f = np.frombuffer(f, dtype=np.uint8) | |
| f = f.reshape( | |
| size_org[1], size_org[0], 3 if template_video_path.endswith(".mp4") else 4 | |
| ) | |
| f = resize_(size, f) | |
| if save_in_video is False: | |
| f = np.ascontiguousarray(f) | |
| f = Image.fromarray( | |
| f, mode="RGB" if template_video_path.endswith(".mp4") else "RGBA" | |
| ) | |
| f.save( | |
| str(Path(template_frames_path) / str(name)), format="png", lossless=True | |
| ) | |
| # cv2.imwrite(str(Path(template_frames_path) / str(name)), f[:, :, [2, 1, 0, 3]], [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) | |
| if writer is None and save_in_video is True: | |
| if ratio != 1.0: | |
| writer = imageio_ffmpeg.write_frames( | |
| template_video_path_with_ratio, | |
| size=size, | |
| fps=fps, | |
| quality=10, | |
| pix_fmt_in="rgba", | |
| pix_fmt_out="rgba", | |
| codec="png", | |
| macro_block_size=1, | |
| ) | |
| writer.send(None) | |
| if writer: | |
| writer.send(f) | |
| if writer: | |
| writer.close() | |
| # hojin end | |