ganeshblank's picture
assets
30f37fa verified
raw
history blame
8.61 kB
# coding: utf-8
import os.path as osp
from dataclasses import dataclass, field
from typing import List, Tuple, Union
import cv2
import numpy as np
cv2.setNumThreads(0)
cv2.ocl.setUseOpenCL(False)
from ..config.crop_config import CropConfig
from .crop import (
average_bbox_lst,
crop_image,
crop_image_by_bbox,
parse_bbox_from_landmark,
)
from .face_analysis_diy import FaceAnalysisDIY
from .io import contiguous
from .landmark_runner import LandmarkRunner
from .rprint import rlog as log
def make_abs_path(fn):
return osp.join(osp.dirname(osp.realpath(__file__)), fn)
@dataclass
class Trajectory:
start: int = -1 # start frame
end: int = -1 # end frame
lmk_lst: Union[Tuple, List, np.ndarray] = field(default_factory=list) # lmk list
bbox_lst: Union[Tuple, List, np.ndarray] = field(default_factory=list) # bbox list
frame_rgb_lst: Union[Tuple, List, np.ndarray] = field(
default_factory=list
) # frame list
lmk_crop_lst: Union[Tuple, List, np.ndarray] = field(
default_factory=list
) # lmk list
frame_rgb_crop_lst: Union[Tuple, List, np.ndarray] = field(
default_factory=list
) # frame crop list
class Cropper(object):
def __init__(self, **kwargs) -> None:
self.crop_cfg: CropConfig = kwargs.get("crop_cfg", None)
device_id = kwargs.get("device_id", 0)
flag_force_cpu = kwargs.get("flag_force_cpu", False)
if flag_force_cpu:
device = "cpu"
face_analysis_wrapper_provicer = ["CPUExecutionProvider"]
else:
device = "cuda"
face_analysis_wrapper_provicer = ["CUDAExecutionProvider"]
self.landmark_runner = LandmarkRunner(
ckpt_path=make_abs_path(self.crop_cfg.landmark_ckpt_path),
onnx_provider=device,
device_id=device_id,
)
self.landmark_runner.warmup()
self.face_analysis_wrapper = FaceAnalysisDIY(
name="buffalo_l",
root=make_abs_path(self.crop_cfg.insightface_root),
providers=face_analysis_wrapper_provicer,
)
self.face_analysis_wrapper.prepare(ctx_id=device_id, det_size=(512, 512))
self.face_analysis_wrapper.warmup()
def update_config(self, user_args):
for k, v in user_args.items():
if hasattr(self.crop_cfg, k):
setattr(self.crop_cfg, k, v)
def crop_source_image(self, img_rgb_: np.ndarray, crop_cfg: CropConfig):
# crop a source image and get neccessary information
img_rgb = img_rgb_.copy() # copy it
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
src_face = self.face_analysis_wrapper.get(
img_bgr,
flag_do_landmark_2d_106=True,
direction=crop_cfg.direction,
max_face_num=crop_cfg.max_face_num,
)
if len(src_face) == 0:
log("No face detected in the source image.")
return None
elif len(src_face) > 1:
log(
f"More than one face detected in the image, only pick one face by rule {crop_cfg.direction}."
)
# NOTE: temporarily only pick the first face, to support multiple face in the future
src_face = src_face[0]
lmk = src_face.landmark_2d_106 # this is the 106 landmarks from insightface
# for (x, y) in lmk:
# cv2.circle(img_bgr, (int(x), int(y)), 2, (0, 255, 0), -1)
# cv2.imwrite("./landmark.png", img_bgr)
# crop the face
ret_dct = crop_image(
img_rgb, # ndarray
lmk, # 106x2 or Nx2
dsize=crop_cfg.dsize,
scale=crop_cfg.scale,
vx_ratio=crop_cfg.vx_ratio,
vy_ratio=crop_cfg.vy_ratio,
)
lmk = self.landmark_runner.run(img_rgb, lmk)
ret_dct["lmk_crop"] = lmk
# update a 256x256 version for network input
ret_dct["img_crop_256x256"] = cv2.resize(
ret_dct["img_crop"], (256, 256), interpolation=cv2.INTER_AREA
)
ret_dct["lmk_crop_256x256"] = ret_dct["lmk_crop"] * 256 / crop_cfg.dsize
# cv2.imwrite("./resize_image.png", ret_dct["img_crop_256x256"] )
return ret_dct
def crop_driving_video(self, driving_rgb_lst, **kwargs):
"""Tracking based landmarks/alignment and cropping"""
trajectory = Trajectory()
direction = kwargs.get("direction", "large-small")
for idx, frame_rgb in enumerate(driving_rgb_lst):
if idx == 0 or trajectory.start == -1:
src_face = self.face_analysis_wrapper.get(
contiguous(frame_rgb[..., ::-1]),
flag_do_landmark_2d_106=True,
direction=direction,
)
if len(src_face) == 0:
log(f"No face detected in the frame #{idx}")
continue
elif len(src_face) > 1:
log(
f"More than one face detected in the driving frame_{idx}, only pick one face by rule {direction}."
)
src_face = src_face[0]
lmk = src_face.landmark_2d_106
lmk = self.landmark_runner.run(frame_rgb, lmk)
trajectory.start, trajectory.end = idx, idx
# for (x, y) in lmk:
# cv2.circle(frame_rgb, (int(x), int(y)), 2, (0, 255, 0), -1)
# cv2.imwrite("./landmarks.png", frame_rgb)
else:
lmk = self.landmark_runner.run(frame_rgb, trajectory.lmk_lst[-1])
trajectory.end = idx
trajectory.lmk_lst.append(lmk)
ret_bbox = parse_bbox_from_landmark(
lmk,
scale=self.crop_cfg.scale_crop_video,
vx_ratio_crop_video=self.crop_cfg.vx_ratio_crop_video,
vy_ratio=self.crop_cfg.vy_ratio_crop_video,
)["bbox"]
bbox = [
ret_bbox[0, 0],
ret_bbox[0, 1],
ret_bbox[2, 0],
ret_bbox[2, 1],
] # 4,
trajectory.bbox_lst.append(bbox) # bbox
trajectory.frame_rgb_lst.append(frame_rgb)
global_bbox = average_bbox_lst(trajectory.bbox_lst)
for idx, (frame_rgb, lmk) in enumerate(
zip(trajectory.frame_rgb_lst, trajectory.lmk_lst)
):
ret_dct = crop_image_by_bbox(
frame_rgb,
global_bbox,
lmk=lmk,
dsize=kwargs.get("dsize", 512),
flag_rot=False,
borderValue=(0, 0, 0),
)
trajectory.frame_rgb_crop_lst.append(ret_dct["img_crop"])
trajectory.lmk_crop_lst.append(ret_dct["lmk_crop"])
return {
"frame_crop_lst": trajectory.frame_rgb_crop_lst,
"lmk_crop_lst": trajectory.lmk_crop_lst,
}
def calc_lmks_from_cropped_video(self, driving_rgb_crop_lst, **kwargs):
"""Tracking based landmarks/alignment"""
trajectory = Trajectory()
direction = kwargs.get("direction", "large-small")
for idx, frame_rgb_crop in enumerate(driving_rgb_crop_lst):
if idx == 0 or trajectory.start == -1:
src_face = self.face_analysis_wrapper.get(
contiguous(frame_rgb_crop[..., ::-1]), # convert to BGR
flag_do_landmark_2d_106=True,
direction=direction,
)
if len(src_face) == 0:
log(f"No face detected in the frame #{idx}")
raise Exception(f"No face detected in the frame #{idx}")
elif len(src_face) > 1:
log(
f"More than one face detected in the driving frame_{idx}, only pick one face by rule {direction}."
)
src_face = src_face[0]
lmk = src_face.landmark_2d_106
lmk = self.landmark_runner.run(frame_rgb_crop, lmk)
trajectory.start, trajectory.end = idx, idx
else:
lmk = self.landmark_runner.run(frame_rgb_crop, trajectory.lmk_lst[-1])
trajectory.end = idx
trajectory.lmk_lst.append(lmk)
return trajectory.lmk_lst