AndroidGuy's picture
Add files with Git LFS support
8dc9718
# -*- coding: utf-8 -*-
# @Author : wenshao
# @Email : wenshaoguo1026@gmail.com
# @Project : FasterLivePortrait
# @FileName: run.py
"""
# video
python run.py \
--src_image assets/examples/driving/d13.mp4 \
--dri_video assets/examples/driving/d11.mp4 \
--cfg configs/trt_infer.yaml \
--paste_back \
--animal
# pkl
python run.py \
--src_image assets/examples/source/s12.jpg \
--dri_video ./results/2024-09-13-081710/d0.mp4.pkl \
--cfg configs/trt_infer.yaml \
--paste_back \
--animal
"""
import os
import argparse
import pdb
import subprocess
import ffmpeg
import cv2
import time
import numpy as np
import os
import datetime
import platform
import pickle
from omegaconf import OmegaConf
from tqdm import tqdm
from colorama import Fore, Back, Style
from src.pipelines.faster_live_portrait_pipeline import FasterLivePortraitPipeline
from src.utils.utils import video_has_audio
if platform.system().lower() == 'windows':
FFMPEG = "third_party/ffmpeg-7.0.1-full_build/bin/ffmpeg.exe"
else:
FFMPEG = "ffmpeg"
def run_with_video(args):
print(Fore.RED+'Render, Q > exit, S > Stitching, Z > RelativeMotion, X > AnimationRegion, C > CropDrivingVideo, KL > AdjustSourceScale, NM > AdjustDriverScale, Space > Webcamassource, R > SwitchRealtimeWebcamUpdate'+Style.RESET_ALL)
infer_cfg = OmegaConf.load(args.cfg)
infer_cfg.infer_params.flag_pasteback = args.paste_back
pipe = FasterLivePortraitPipeline(cfg=infer_cfg, is_animal=args.animal)
ret = pipe.prepare_source(args.src_image, realtime=args.realtime)
if not ret:
print(f"no face in {args.src_image}! exit!")
exit(1)
if not args.dri_video or not os.path.exists(args.dri_video):
# read frame from camera if no driving video input
vcap = cv2.VideoCapture(0)
if not vcap.isOpened():
print("no camera found! exit!")
exit(1)
else:
vcap = cv2.VideoCapture(args.dri_video)
fps = int(vcap.get(cv2.CAP_PROP_FPS))
h, w = pipe.src_imgs[0].shape[:2]
save_dir = f"./results/{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}"
os.makedirs(save_dir, exist_ok=True)
# render output video
if not args.realtime:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
vsave_crop_path = os.path.join(save_dir,
f"{os.path.basename(args.src_image)}-{os.path.basename(args.dri_video)}-crop.mp4")
vout_crop = cv2.VideoWriter(vsave_crop_path, fourcc, fps, (512 * 2, 512))
vsave_org_path = os.path.join(save_dir,
f"{os.path.basename(args.src_image)}-{os.path.basename(args.dri_video)}-org.mp4")
vout_org = cv2.VideoWriter(vsave_org_path, fourcc, fps, (w, h))
infer_times = []
motion_lst = []
c_eyes_lst = []
c_lip_lst = []
frame_ind = 0
while vcap.isOpened():
ret, frame = vcap.read()
if not ret:
break
t0 = time.time()
first_frame = frame_ind == 0
dri_crop, out_crop, out_org, dri_motion_info = pipe.run(frame, pipe.src_imgs[0], pipe.src_infos[0],
first_frame=first_frame)
frame_ind += 1
if out_crop is None:
print(f"no face in driving frame:{frame_ind}")
continue
motion_lst.append(dri_motion_info[0])
c_eyes_lst.append(dri_motion_info[1])
c_lip_lst.append(dri_motion_info[2])
infer_times.append(time.time() - t0)
# print(time.time() - t0)
dri_crop = cv2.resize(dri_crop, (512, 512))
out_crop = np.concatenate([dri_crop, out_crop], axis=1)
out_crop = cv2.cvtColor(out_crop, cv2.COLOR_RGB2BGR)
if not args.realtime:
vout_crop.write(out_crop)
out_org = cv2.cvtColor(out_org, cv2.COLOR_RGB2BGR)
vout_org.write(out_org)
else:
if infer_cfg.infer_params.flag_pasteback:
out_org = cv2.cvtColor(out_org, cv2.COLOR_RGB2BGR)
cv2.imshow('Render', out_org)
else:
# image show in realtime mode
cv2.imshow('Render', out_crop)
# 按下'q'键退出循环
if cv2.waitKey(1) & 0xFF == ord('q'):
break
vcap.release()
if not args.realtime:
vout_crop.release()
vout_org.release()
if video_has_audio(args.dri_video):
vsave_crop_path_new = os.path.splitext(vsave_crop_path)[0] + "-audio.mp4"
subprocess.call(
[FFMPEG, "-i", vsave_crop_path, "-i", args.dri_video,
"-b:v", "10M", "-c:v",
"libx264", "-map", "0:v", "-map", "1:a",
"-c:a", "aac",
"-pix_fmt", "yuv420p", vsave_crop_path_new, "-y", "-shortest"])
vsave_org_path_new = os.path.splitext(vsave_org_path)[0] + "-audio.mp4"
subprocess.call(
[FFMPEG, "-i", vsave_org_path, "-i", args.dri_video,
"-b:v", "10M", "-c:v",
"libx264", "-map", "0:v", "-map", "1:a",
"-c:a", "aac",
"-pix_fmt", "yuv420p", vsave_org_path_new, "-y", "-shortest"])
print(vsave_crop_path_new)
print(vsave_org_path_new)
else:
print(vsave_crop_path)
print(vsave_org_path)
else:
cv2.destroyAllWindows()
print(
"inference median time: {} ms/frame, mean time: {} ms/frame".format(np.median(infer_times) * 1000,
np.mean(infer_times) * 1000))
# save driving motion to pkl
template_dct = {
'n_frames': len(motion_lst),
'output_fps': fps,
'motion': motion_lst,
'c_eyes_lst': c_eyes_lst,
'c_lip_lst': c_lip_lst,
}
template_pkl_path = os.path.join(save_dir,
f"{os.path.basename(args.dri_video)}.pkl")
with open(template_pkl_path, "wb") as fw:
pickle.dump(template_dct, fw)
print(f"save driving motion pkl file at : {template_pkl_path}")
def run_with_pkl(args):
infer_cfg = OmegaConf.load(args.cfg)
infer_cfg.infer_params.flag_pasteback = args.paste_back
pipe = FasterLivePortraitPipeline(cfg=infer_cfg, is_animal=args.animal)
ret = pipe.prepare_source(args.src_image, realtime=args.realtime)
if not ret:
print(f"no face in {args.src_image}! exit!")
return
with open(args.dri_video, "rb") as fin:
dri_motion_infos = pickle.load(fin)
fps = int(dri_motion_infos["output_fps"])
h, w = pipe.src_imgs[0].shape[:2]
save_dir = f"./results/{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}"
os.makedirs(save_dir, exist_ok=True)
# render output video
if not args.realtime:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
vsave_crop_path = os.path.join(save_dir,
f"{os.path.basename(args.src_image)}-{os.path.basename(args.dri_video)}-crop.mp4")
vout_crop = cv2.VideoWriter(vsave_crop_path, fourcc, fps, (512, 512))
vsave_org_path = os.path.join(save_dir,
f"{os.path.basename(args.src_image)}-{os.path.basename(args.dri_video)}-org.mp4")
vout_org = cv2.VideoWriter(vsave_org_path, fourcc, fps, (w, h))
infer_times = []
motion_lst = dri_motion_infos["motion"]
c_eyes_lst = dri_motion_infos["c_eyes_lst"] if "c_eyes_lst" in dri_motion_infos else dri_motion_infos[
"c_d_eyes_lst"]
c_lip_lst = dri_motion_infos["c_lip_lst"] if "c_lip_lst" in dri_motion_infos else dri_motion_infos["c_d_lip_lst"]
frame_num = len(motion_lst)
for frame_ind in tqdm(range(frame_num)):
t0 = time.time()
first_frame = frame_ind == 0
dri_motion_info_ = [motion_lst[frame_ind], c_eyes_lst[frame_ind], c_lip_lst[frame_ind]]
out_crop, out_org = pipe.run_with_pkl(dri_motion_info_, pipe.src_imgs[0], pipe.src_infos[0],
first_frame=first_frame)
if out_crop is None:
print(f"no face in driving frame:{frame_ind}")
continue
infer_times.append(time.time() - t0)
# print(time.time() - t0)
out_crop = cv2.cvtColor(out_crop, cv2.COLOR_RGB2BGR)
if not args.realtime:
vout_crop.write(out_crop)
out_org = cv2.cvtColor(out_org, cv2.COLOR_RGB2BGR)
vout_org.write(out_org)
else:
if infer_cfg.infer_params.flag_pasteback:
out_org = cv2.cvtColor(out_org, cv2.COLOR_RGB2BGR)
cv2.imshow('Render, Q > exit, S > Stitching, Z > RelativeMotion, X > AnimationRegion, C > CropDrivingVideo, KL > AdjustSourceScale, NM > AdjustDriverScale, Space > Webcamassource, R > SwitchRealtimeWebcamUpdate',out_org)
else:
# image show in realtime mode
cv2.imshow('Render, Q > exit, S > Stitching, Z > RelativeMotion, X > AnimationRegion, C > CropDrivingVideo, KL > AdjustSourceScale, NM > AdjustDriverScale, Space > Webcamassource, R > SwitchRealtimeWebcamUpdate', out_crop)
# Press the 'q' key to exit the loop, r to switch realtime src_webcam update, spacebar to switch sourceisWebcam
k = cv2.waitKey(1) & 0xFF
if k == ord('q'):
break
# Key for Interesting Params
if k == ord('s'):
infer_cfg.infer_params.flag_stitching = not infer_cfg.infer_params.flag_stitching
print('flag_stitching:'+str(infer_cfg.infer_params.flag_stitching))
if k == ord('z'):
infer_cfg.infer_params.flag_relative_motion = not infer_cfg.infer_params.flag_relative_motion
print('flag_relative_motion:'+str(infer_cfg.infer_params.flag_relative_motion))
if k == ord('x'):
if infer_cfg.infer_params.animation_region == "all": infer_cfg.infer_params.animation_region = "exp", print('animation_region = "exp"')
else:infer_cfg.infer_params.animation_region = "all", print('animation_region = "all"')
if k == ord('c'):
infer_cfg.infer_params.flag_crop_driving_video = not infer_cfg.infer_params.flag_crop_driving_video
print('flag_crop_driving_video:'+str(infer_cfg.infer_params.flag_crop_driving_video))
if k == ord('v'):
infer_cfg.infer_params.flag_pasteback = not infer_cfg.infer_params.flag_pasteback
print('flag_pasteback:'+str(infer_cfg.infer_params.flag_pasteback))
if k == ord('a'):
infer_cfg.infer_params.flag_normalize_lip = not infer_cfg.infer_params.flag_normalize_lip
print('flag_normalize_lip:'+str(infer_cfg.infer_params.flag_normalize_lip))
if k == ord('d'):
infer_cfg.infer_params.flag_source_video_eye_retargeting = not infer_cfg.infer_params.flag_source_video_eye_retargeting
print('flag_source_video_eye_retargeting:'+str(infer_cfg.infer_params.flag_source_video_eye_retargeting))
if k == ord('f'):
infer_cfg.infer_params.flag_video_editing_head_rotation = not infer_cfg.infer_params.flag_video_editing_head_rotation
print('flag_video_editing_head_rotation:'+str(infer_cfg.infer_params.flag_video_editing_head_rotation))
if k == ord('g'):
infer_cfg.infer_params.flag_eye_retargeting = not infer_cfg.infer_params.flag_eye_retargeting
print('flag_eye_retargeting:'+str(infer_cfg.infer_params.flag_eye_retargeting))
if k == ord('k'):
infer_cfg.crop_params.src_scale -= 0.1
ret = pipe.prepare_source(args.src_image, realtime=args.realtime)
print('src_scale:'+str(infer_cfg.crop_params.src_scale))
if k == ord('l'):
infer_cfg.crop_params.src_scale += 0.1
ret = pipe.prepare_source(args.src_image, realtime=args.realtime)
print('src_scale:'+str(infer_cfg.crop_params.src_scale))
if k == ord('n'):
infer_cfg.crop_params.dri_scale -= 0.1
print('dri_scale:'+str(infer_cfg.crop_params.dri_scale))
if k == ord('m'):
infer_cfg.crop_params.dri_scale += 0.1
print('dri_scale:'+str(infer_cfg.crop_params.dri_scale))
if not args.realtime:
vout_crop.release()
vout_org.release()
if video_has_audio(args.dri_video):
vsave_crop_path_new = os.path.splitext(vsave_crop_path)[0] + "-audio.mp4"
subprocess.call(
[FFMPEG, "-i", vsave_crop_path, "-i", args.dri_video,
"-b:v", "10M", "-c:v",
"libx264", "-map", "0:v", "-map", "1:a",
"-c:a", "aac",
"-pix_fmt", "yuv420p", vsave_crop_path_new, "-y", "-shortest"])
vsave_org_path_new = os.path.splitext(vsave_org_path)[0] + "-audio.mp4"
subprocess.call(
[FFMPEG, "-i", vsave_org_path, "-i", args.dri_video,
"-b:v", "10M", "-c:v",
"libx264", "-map", "0:v", "-map", "1:a",
"-c:a", "aac",
"-pix_fmt", "yuv420p", vsave_org_path_new, "-y", "-shortest"])
print(vsave_crop_path_new)
print(vsave_org_path_new)
else:
print(vsave_crop_path)
print(vsave_org_path)
else:
cv2.destroyAllWindows()
print(
"inference median time: {} ms/frame, mean time: {} ms/frame".format(np.median(infer_times) * 1000,
np.mean(infer_times) * 1000))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Faster Live Portrait Pipeline')
parser.add_argument('--src_image', required=False, type=str, default="assets/examples/source/s12.jpg",
help='source image')
parser.add_argument('--dri_video', required=False, type=str, default="assets/examples/driving/d14.mp4",
help='driving video')
parser.add_argument('--cfg', required=False, type=str, default="configs/onnx_infer.yaml", help='inference config')
parser.add_argument('--realtime', action='store_true', help='realtime inference')
parser.add_argument('--animal', action='store_true', help='use animal model')
parser.add_argument('--paste_back', action='store_true', default=False, help='paste back to origin image')
args, unknown = parser.parse_known_args()
if args.dri_video.endswith(".pkl"):
run_with_pkl(args)
else:
run_with_video(args)