pengc02's picture
all
ec9a6bc
raw
history blame
56 kB
import os
import numpy as np
import math
import cv2
import onnxruntime
import time
import queue
import threading
import copy
from .retinaface import RetinaFaceDetector
from .remedian import remedian
def resolve(name):
f = os.path.join(os.path.dirname(__file__), name)
return f
def clamp_to_im(pt, w, h):
x = pt[0]
y = pt[1]
if x < 0:
x = 0
if y < 0:
y = 0
if x >= w:
x = w-1
if y >= h:
y = h-1
return (int(x), int(y+1))
def rotate(origin, point, a):
a = -a
ox, oy = origin
px, py = point
qx = ox + math.cos(a) * (px - ox) - math.sin(a) * (py - oy)
qy = oy + math.sin(a) * (px - ox) + math.cos(a) * (py - oy)
return qx, qy
def angle(p1, p2):
p1 = np.array(p1)
p2 = np.array(p2)
a = np.arctan2(*(p2 - p1)[::-1])
return (a % (2 * np.pi))
def compensate(p1, p2):
a = angle(p1, p2)
return rotate(p1, p2, a), a
def rotate_image(image, a, center):
(h, w) = image.shape[:2]
a = np.rad2deg(a)
M = cv2.getRotationMatrix2D((float(center[0]), float(center[1])), a, 1.0)
rotated = cv2.warpAffine(image, M, (w, h))
return rotated
def intersects(r1, r2, amount=0.3):
area1 = r1[2] * r1[3]
area2 = r2[2] * r2[3]
inter = 0.0
total = area1 + area2
r1_x1, r1_y1, w, h = r1
r1_x2 = r1_x1 + w
r1_y2 = r1_y1 + h
r2_x1, r2_y1, w, h = r2
r2_x2 = r2_x1 + w
r2_y2 = r2_y1 + h
left = max(r1_x1, r2_x1)
right = min(r1_x2, r2_x2)
top = max(r1_y1, r2_y1)
bottom = min(r1_y2, r2_y2)
if left < right and top < bottom:
inter = (right - left) * (bottom - top)
total -= inter
if inter / total >= amount:
return True
return False
#return not (r1_x1 > r2_x2 or r1_x2 < r2_x1 or r1_y1 > r2_y2 or r1_y2 < r2_y1)
def group_rects(rects):
rect_groups = {}
for rect in rects:
rect_groups[str(rect)] = [-1, -1, []]
group_id = 0
for i, rect in enumerate(rects):
name = str(rect)
group = group_id
group_id += 1
if rect_groups[name][0] < 0:
rect_groups[name] = [group, -1, []]
else:
group = rect_groups[name][0]
for j, other_rect in enumerate(rects):
if i == j:
continue;
inter = intersects(rect, other_rect)
if intersects(rect, other_rect):
rect_groups[str(other_rect)] = [group, -1, []]
return rect_groups
def logit(p, factor=16.0):
if p >= 1.0:
p = 0.9999999
if p <= 0.0:
p = 0.0000001
p = p/(1-p)
return float(np.log(p)) / float(factor)
def logit_arr(p, factor=16.0):
p = np.clip(p, 0.0000001, 0.9999999)
return np.log(p / (1 - p)) / float(factor)
def matrix_to_quaternion(m):
t = 0.0
q = [0.0, 0.0, 0, 0.0]
if m[2,2] < 0:
if m[0,0] > m[1,1]:
t = 1 + m[0,0] - m[1,1] - m[2,2]
q = [t, m[0,1]+m[1,0], m[2,0]+m[0,2], m[1,2]-m[2,1]]
else:
t = 1 - m[0,0] + m[1,1] - m[2,2]
q = [m[0,1]+m[1,0], t, m[1,2]+m[2,1], m[2,0]-m[0,2]]
else:
if m[0,0] < -m[1,1]:
t = 1 - m[0,0] - m[1,1] + m[2,2]
q = [m[2,0]+m[0,2], m[1,2]+m[2,1], t, m[0,1]-m[1,0]]
else:
t = 1 + m[0,0] + m[1,1] + m[2,2]
q = [m[1,2]-m[2,1], m[2,0]-m[0,2], m[0,1]-m[1,0], t]
q = np.array(q, np.float32) * 0.5 / np.sqrt(t)
return q
def worker_thread(session, frame, input, crop_info, queue, input_name, idx, tracker):
output = session.run([], {input_name: input})[0]
conf, lms = tracker.landmarks(output[0], crop_info)
if conf > tracker.threshold:
try:
eye_state = tracker.get_eye_state(frame, lms, single=True)
except:
eye_state = [(1.0, 0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0)]
queue.put((session, conf, (lms, eye_state), crop_info, idx))
else:
queue.put((session,))
class Feature():
def __init__(self, threshold=0.15, alpha=0.2, hard_factor=0.15, decay=0.001, max_feature_updates=0):
self.median = remedian()
self.min = None
self.max = None
self.hard_min = None
self.hard_max = None
self.threshold = threshold
self.alpha = alpha
self.hard_factor = hard_factor
self.decay = decay
self.last = 0
self.current_median = 0
self.update_count = 0
self.max_feature_updates = max_feature_updates
self.first_seen = -1
self.updating = True
def update(self, x, now=0):
if self.max_feature_updates > 0:
if self.first_seen == -1:
self.first_seen = now;
new = self.update_state(x, now=now)
filtered = self.last * self.alpha + new * (1 - self.alpha)
self.last = filtered
return filtered
def update_state(self, x, now=0):
updating = self.updating and (self.max_feature_updates == 0 or now - self.first_seen < self.max_feature_updates)
if updating:
self.median + x
self.current_median = self.median.median()
else:
self.updating = False
median = self.current_median
if self.min is None:
if x < median and (median - x) / median > self.threshold:
if updating:
self.min = x
self.hard_min = self.min + self.hard_factor * (median - self.min)
return -1
return 0
else:
if x < self.min:
if updating:
self.min = x
self.hard_min = self.min + self.hard_factor * (median - self.min)
return -1
if self.max is None:
if x > median and (x - median) / median > self.threshold:
if updating:
self.max = x
self.hard_max = self.max - self.hard_factor * (self.max - median)
return 1
return 0
else:
if x > self.max:
if updating:
self.max = x
self.hard_max = self.max - self.hard_factor * (self.max - median)
return 1
if updating:
if self.min < self.hard_min:
self.min = self.hard_min * self.decay + self.min * (1 - self.decay)
if self.max > self.hard_max:
self.max = self.hard_max * self.decay + self.max * (1 - self.decay)
if x < median:
return - (1 - (x - self.min) / (median - self.min))
elif x > median:
return (x - median) / (self.max - median)
return 0
class FeatureExtractor():
def __init__(self, max_feature_updates=0):
self.eye_l = Feature(max_feature_updates=max_feature_updates)
self.eye_r = Feature(max_feature_updates=max_feature_updates)
self.eyebrow_updown_l = Feature(max_feature_updates=max_feature_updates)
self.eyebrow_updown_r = Feature(max_feature_updates=max_feature_updates)
self.eyebrow_quirk_l = Feature(threshold=0.05, max_feature_updates=max_feature_updates)
self.eyebrow_quirk_r = Feature(threshold=0.05, max_feature_updates=max_feature_updates)
self.eyebrow_steepness_l = Feature(threshold=0.05, max_feature_updates=max_feature_updates)
self.eyebrow_steepness_r = Feature(threshold=0.05, max_feature_updates=max_feature_updates)
self.mouth_corner_updown_l = Feature(max_feature_updates=max_feature_updates)
self.mouth_corner_updown_r = Feature(max_feature_updates=max_feature_updates)
self.mouth_corner_inout_l = Feature(threshold=0.02, max_feature_updates=max_feature_updates)
self.mouth_corner_inout_r = Feature(threshold=0.02, max_feature_updates=max_feature_updates)
self.mouth_open = Feature(max_feature_updates=max_feature_updates)
self.mouth_wide = Feature(threshold=0.02, max_feature_updates=max_feature_updates)
def align_points(self, a, b, pts):
a = tuple(a)
b = tuple(b)
alpha = angle(a, b)
alpha = np.rad2deg(alpha)
if alpha >= 90:
alpha = - (alpha - 180)
if alpha <= -90:
alpha = - (alpha + 180)
alpha = np.deg2rad(alpha)
aligned_pts = []
for pt in pts:
aligned_pts.append(np.array(rotate(a, pt, alpha)))
return alpha, np.array(aligned_pts)
def update(self, pts, full=True):
features = {}
now = time.perf_counter()
norm_distance_x = np.mean([pts[0, 0] - pts[16, 0], pts[1, 0] - pts[15, 0]])
norm_distance_y = np.mean([pts[27, 1] - pts[28, 1], pts[28, 1] - pts[29, 1], pts[29, 1] - pts[30, 1]])
a1, f_pts = self.align_points(pts[42], pts[45], pts[[43, 44, 47, 46]])
f = abs((np.mean([f_pts[0,1], f_pts[1,1]]) - np.mean([f_pts[2,1], f_pts[3,1]])) / norm_distance_y)
features["eye_l"] = self.eye_l.update(f, now)
a2, f_pts = self.align_points(pts[36], pts[39], pts[[37, 38, 41, 40]])
f = abs((np.mean([f_pts[0,1], f_pts[1,1]]) - np.mean([f_pts[2,1], f_pts[3,1]])) / norm_distance_y)
features["eye_r"] = self.eye_r.update(f, now)
if full:
a3, _ = self.align_points(pts[0], pts[16], [])
a4, _ = self.align_points(pts[31], pts[35], [])
norm_angle = np.mean(list(map(np.rad2deg, [a1, a2, a3, a4])))
a, f_pts = self.align_points(pts[22], pts[26], pts[[22, 23, 24, 25, 26]])
features["eyebrow_steepness_l"] = self.eyebrow_steepness_l.update(-np.rad2deg(a) - norm_angle, now)
f = np.max(np.abs(np.array(f_pts[1:4]) - f_pts[0, 1])) / norm_distance_y
features["eyebrow_quirk_l"] = self.eyebrow_quirk_l.update(f, now)
a, f_pts = self.align_points(pts[17], pts[21], pts[[17, 18, 19, 20, 21]])
features["eyebrow_steepness_r"] = self.eyebrow_steepness_r.update(np.rad2deg(a) - norm_angle, now)
f = np.max(np.abs(np.array(f_pts[1:4]) - f_pts[0, 1])) / norm_distance_y
features["eyebrow_quirk_r"] = self.eyebrow_quirk_r.update(f, now)
else:
features["eyebrow_steepness_l"] = 0.
features["eyebrow_steepness_r"] = 0.
features["eyebrow_quirk_l"] = 0.
features["eyebrow_quirk_r"] = 0.
f = (np.mean([pts[22, 1], pts[26, 1]]) - pts[27, 1]) / norm_distance_y
features["eyebrow_updown_l"] = self.eyebrow_updown_l.update(f, now)
f = (np.mean([pts[17, 1], pts[21, 1]]) - pts[27, 1]) / norm_distance_y
features["eyebrow_updown_r"] = self.eyebrow_updown_r.update(f, now)
upper_mouth_line = np.mean([pts[49, 1], pts[50, 1], pts[51, 1]])
center_line = np.mean([pts[50, 0], pts[60, 0], pts[27, 0], pts[30, 0], pts[64, 0], pts[55, 0]])
f = (upper_mouth_line - pts[62, 1]) / norm_distance_y
features["mouth_corner_updown_l"] = self.mouth_corner_updown_l.update(f, now)
if full:
f = abs(center_line - pts[62, 0]) / norm_distance_x
features["mouth_corner_inout_l"] = self.mouth_corner_inout_l.update(f, now)
else:
features["mouth_corner_inout_l"] = 0.
f = (upper_mouth_line - pts[58, 1]) / norm_distance_y
features["mouth_corner_updown_r"] = self.mouth_corner_updown_r.update(f, now)
if full:
f = abs(center_line - pts[58, 0]) / norm_distance_x
features["mouth_corner_inout_r"] = self.mouth_corner_inout_r.update(f, now)
else:
features["mouth_corner_inout_r"] = 0.
f = abs(np.mean(pts[[59,60,61], 1], axis=0) - np.mean(pts[[63,64,65], 1], axis=0)) / norm_distance_y
features["mouth_open"] = self.mouth_open.update(f, now)
f = abs(pts[58, 0] - pts[62, 0]) / norm_distance_x
features["mouth_wide"] = self.mouth_wide.update(f, now)
return features
class FaceInfo():
def __init__(self, id, tracker):
self.id = id
self.frame_count = -1
self.tracker = tracker
self.contour_pts = [0,1,8,15,16,27,28,29,30,31,32,33,34,35]
self.face_3d = copy.copy(self.tracker.face_3d)
if self.tracker.model_type == -1:
self.contour_pts = [0,2,8,14,16,27,30,33]
self.reset()
self.alive = False
self.coord = None
self.base_scale_v = self.tracker.face_3d[27:30, 1] - self.tracker.face_3d[28:31, 1]
self.base_scale_h = np.abs(self.tracker.face_3d[[0, 36, 42], 0] - self.tracker.face_3d[[16, 39, 45], 0])
self.limit_3d_adjustment = True
self.update_count_delta = 75.
self.update_count_max = 7500.
if self.tracker.max_feature_updates > 0:
self.features = FeatureExtractor(self.tracker.max_feature_updates)
def reset(self):
self.alive = False
self.conf = None
self.lms = None
self.eye_state = None
self.rotation = None
self.translation = None
self.success = None
self.quaternion = None
self.euler = None
self.pnp_error = None
self.pts_3d = None
self.eye_blink = None
self.bbox = None
self.pnp_error = 0
if self.tracker.max_feature_updates < 1:
self.features = FeatureExtractor(0)
self.current_features = {}
self.contour = np.zeros((21,3))
self.update_counts = np.zeros((66,2))
self.update_contour()
self.fail_count = 0
def update(self, result, coord, frame_count):
self.frame_count = frame_count
if result is None:
self.reset()
else:
self.conf, (self.lms, self.eye_state) = result
self.coord = coord
self.alive = True
def update_contour(self):
self.contour = np.array(self.face_3d[self.contour_pts])
def normalize_pts3d(self, pts_3d):
# Calculate angle using nose
pts_3d[:, 0:2] -= pts_3d[30, 0:2]
alpha = angle(pts_3d[30, 0:2], pts_3d[27, 0:2])
alpha -= np.deg2rad(90)
R = np.matrix([[np.cos(alpha), -np.sin(alpha)], [np.sin(alpha), np.cos(alpha)]])
pts_3d[:, 0:2] = (pts_3d - pts_3d[30])[:, 0:2].dot(R) + pts_3d[30, 0:2]
# Vertical scale
pts_3d[:, 1] /= np.mean((pts_3d[27:30, 1] - pts_3d[28:31, 1]) / self.base_scale_v)
# Horizontal scale
pts_3d[:, 0] /= np.mean(np.abs(pts_3d[[0, 36, 42], 0] - pts_3d[[16, 39, 45], 0]) / self.base_scale_h)
return pts_3d
def adjust_3d(self):
if self.conf < 0.4 or self.pnp_error > 300:
return
if self.tracker.model_type != -1 and not self.tracker.static_model:
max_runs = 1
eligible = np.delete(np.arange(0, 66), [30])
changed_any = False
update_type = -1
d_o = np.ones((66,))
d_c = np.ones((66,))
for runs in range(max_runs):
r = 1.0 + np.random.random_sample((66,3)) * 0.02 - 0.01
r[30, :] = 1.0
if self.euler[0] > -165 and self.euler[0] < 145:
continue
elif self.euler[1] > -10 and self.euler[1] < 20:
r[:, 2] = 1.0
update_type = 0
else:
r[:, 0:2] = 1.0
if self.euler[2] > 120 or self.euler[2] < 60:
continue
# Enable only one side of the points, depending on direction
elif self.euler[1] < -10:
update_type = 1
r[[0, 1, 2, 3, 4, 5, 6, 7, 17, 18, 19, 20, 21, 31, 32, 36, 37, 38, 39, 40, 41, 48, 49, 56, 57, 58, 59, 65], 2] = 1.0
eligible = [8, 9, 10, 11, 12, 13, 14, 15, 16, 22, 23, 24, 25, 26, 27, 28, 29, 33, 34, 35, 42, 43, 44, 45, 46, 47, 50, 51, 52, 53, 54, 55, 60, 61, 62, 63, 64]
else:
update_type = 1
r[[9, 10, 11, 12, 13, 14, 15, 16, 22, 23, 24, 25, 26, 34, 35, 42, 43, 44, 45, 46, 47, 51, 52, 53, 54, 61, 62, 63], 2] = 1.0
eligible = [0, 1, 2, 3, 4, 5, 6, 7, 8, 17, 18, 19, 20, 21, 27, 28, 29, 31, 32, 33, 36, 37, 38, 39, 40, 41, 48, 49, 50, 55, 56, 57, 58, 59, 60, 64, 65]
if self.limit_3d_adjustment:
eligible = np.nonzero(self.update_counts[:, update_type] < self.update_counts[:, abs(update_type - 1)] + self.update_count_delta)[0]
if eligible.shape[0] <= 0:
break
if runs == 0:
updated = copy.copy(self.face_3d[0:66])
o_projected = np.ones((66,2))
o_projected[eligible] = np.squeeze(np.array(cv2.projectPoints(self.face_3d[eligible], self.rotation, self.translation, self.tracker.camera, self.tracker.dist_coeffs)[0]), 1)
c = updated * r
c_projected = np.zeros((66,2))
c_projected[eligible] = np.squeeze(np.array(cv2.projectPoints(c[eligible], self.rotation, self.translation, self.tracker.camera, self.tracker.dist_coeffs)[0]), 1)
changed = False
d_o[eligible] = np.linalg.norm(o_projected[eligible] - self.lms[eligible, 0:2], axis=1)
d_c[eligible] = np.linalg.norm(c_projected[eligible] - self.lms[eligible, 0:2], axis=1)
indices = np.nonzero(d_c < d_o)[0]
if indices.shape[0] > 0:
if self.limit_3d_adjustment:
indices = np.intersect1d(indices, eligible)
if indices.shape[0] > 0:
self.update_counts[indices, update_type] += 1
updated[indices] = c[indices]
o_projected[indices] = c_projected[indices]
changed = True
changed_any = changed_any or changed
if not changed:
break
if changed_any:
# Update weighted by point confidence
weights = np.zeros((66,3))
weights[:, :] = self.lms[0:66, 2:3]
weights[weights > 0.7] = 1.0
weights = 1.0 - weights
update_indices = np.arange(0, 66)
if self.limit_3d_adjustment:
update_indices = np.nonzero(self.update_counts[:, update_type] <= self.update_count_max)[0]
self.face_3d[update_indices] = self.face_3d[update_indices] * weights[update_indices] + updated[update_indices] * (1. - weights[update_indices])
self.update_contour()
self.pts_3d = self.normalize_pts3d(self.pts_3d)
if self.tracker.feature_level == 2:
self.current_features = self.features.update(self.pts_3d[:, 0:2])
self.eye_blink = []
self.eye_blink.append(1 - min(max(0, -self.current_features["eye_r"]), 1))
self.eye_blink.append(1 - min(max(0, -self.current_features["eye_l"]), 1))
elif self.tracker.feature_level == 1:
self.current_features = self.features.update(self.pts_3d[:, 0:2], False)
self.eye_blink = []
self.eye_blink.append(1 - min(max(0, -self.current_features["eye_r"]), 1))
self.eye_blink.append(1 - min(max(0, -self.current_features["eye_l"]), 1))
def get_model_base_path(model_dir):
model_base_path = resolve(os.path.join("models"))
if model_dir is None:
if not os.path.exists(model_base_path):
model_base_path = resolve(os.path.join("..", "models"))
else:
model_base_path = model_dir
return model_base_path
class Tracker():
def __init__(self, width, height, model_type=3, detection_threshold=0.6, threshold=None, max_faces=1, discard_after=5, scan_every=3, bbox_growth=0.0, max_threads=4, silent=False, model_dir=None, no_gaze=False, use_retinaface=False, max_feature_updates=0, static_model=False, feature_level=2, try_hard=False):
options = onnxruntime.SessionOptions()
options.inter_op_num_threads = 1
options.intra_op_num_threads = min(max_threads,4)
options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
options.log_severity_level = 3
self.model_type = model_type
self.models = [
"lm_model0_opt.onnx",
"lm_model1_opt.onnx",
"lm_model2_opt.onnx",
"lm_model3_opt.onnx",
"lm_model4_opt.onnx"
]
model = "lm_modelT_opt.onnx"
if model_type >= 0:
model = self.models[self.model_type]
if model_type == -2:
model = "lm_modelV_opt.onnx"
if model_type == -3:
model = "lm_modelU_opt.onnx"
model_base_path = get_model_base_path(model_dir)
if threshold is None:
threshold = 0.6
if model_type < 0:
threshold = 0.87
self.retinaface = RetinaFaceDetector(model_path=os.path.join(model_base_path, "retinaface_640x640_opt.onnx"), json_path=os.path.join(model_base_path, "priorbox_640x640.json"), threads=max(max_threads,4), top_k=max_faces, res=(640, 640))
self.retinaface_scan = RetinaFaceDetector(model_path=os.path.join(model_base_path, "retinaface_640x640_opt.onnx"), json_path=os.path.join(model_base_path, "priorbox_640x640.json"), threads=2, top_k=max_faces, res=(640, 640))
self.use_retinaface = use_retinaface
# Single face instance with multiple threads
self.session = onnxruntime.InferenceSession(os.path.join(model_base_path, model), sess_options=options)
# Multiple faces with single threads
self.sessions = []
self.max_workers = max(min(max_threads, max_faces), 1)
extra_threads = max_threads % self.max_workers
for i in range(self.max_workers):
options = onnxruntime.SessionOptions()
options.inter_op_num_threads = 1
options.intra_op_num_threads = min(max(max_threads // self.max_workers, 4), 1)
if options.intra_op_num_threads < 1:
options.intra_op_num_threads = 1
elif i < extra_threads:
options.intra_op_num_threads += 1
options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
self.sessions.append(onnxruntime.InferenceSession(os.path.join(model_base_path, model), sess_options=options))
self.input_name = self.session.get_inputs()[0].name
options = onnxruntime.SessionOptions()
options.inter_op_num_threads = 1
options.intra_op_num_threads = max(max_threads,4)
options.execution_mode = onnxruntime.ExecutionMode.ORT_SEQUENTIAL
options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
options.log_severity_level = 3
self.gaze_model = onnxruntime.InferenceSession(os.path.join(model_base_path, "mnv3_gaze32_split_opt.onnx"), sess_options=options)
options.intra_op_num_threads = 1
self.gaze_model_single = onnxruntime.InferenceSession(os.path.join(model_base_path, "mnv3_gaze32_split_opt.onnx"), sess_options=options)
self.detection = onnxruntime.InferenceSession(os.path.join(model_base_path, "mnv3_detection_opt.onnx"), sess_options=options)
self.faces = []
# Image normalization constants
self.mean = np.float32(np.array([0.485, 0.456, 0.406]))
self.std = np.float32(np.array([0.229, 0.224, 0.225]))
self.mean = self.mean / self.std
self.std = self.std * 255.0
self.mean = - self.mean
self.std = 1.0 / self.std
self.mean_32 = np.tile(self.mean, [32, 32, 1])
self.std_32 = np.tile(self.std, [32, 32, 1])
self.mean_224 = np.tile(self.mean, [224, 224, 1])
self.std_224 = np.tile(self.std, [224, 224, 1])
# PnP solving
self.face_3d = np.array([
[ 0.4551769692672 , 0.300895790030204, -0.764429433974752],
[ 0.448998827123556, 0.166995837790733, -0.765143004071253],
[ 0.437431554952677, 0.022655479179981, -0.739267175112735],
[ 0.415033422928434, -0.088941454648772, -0.747947437846473],
[ 0.389123587370091, -0.232380029794684, -0.704788385327458],
[ 0.334630113904382, -0.361265387599081, -0.615587579236862],
[ 0.263725112132858, -0.460009725616771, -0.491479221041573],
[ 0.16241621322721 , -0.558037146073869, -0.339445180872282],
[ 0. , -0.621079019321682, -0.287294770748887],
[-0.16241621322721 , -0.558037146073869, -0.339445180872282],
[-0.263725112132858, -0.460009725616771, -0.491479221041573],
[-0.334630113904382, -0.361265387599081, -0.615587579236862],
[-0.389123587370091, -0.232380029794684, -0.704788385327458],
[-0.415033422928434, -0.088941454648772, -0.747947437846473],
[-0.437431554952677, 0.022655479179981, -0.739267175112735],
[-0.448998827123556, 0.166995837790733, -0.765143004071253],
[-0.4551769692672 , 0.300895790030204, -0.764429433974752],
[ 0.385529968662985, 0.402800553948697, -0.310031082540741],
[ 0.322196658344302, 0.464439136821772, -0.250558059367669],
[ 0.25409760441282 , 0.46420381416882 , -0.208177722146526],
[ 0.186875436782135, 0.44706071961879 , -0.145299823706503],
[ 0.120880983543622, 0.423566314072968, -0.110757158774771],
[-0.120880983543622, 0.423566314072968, -0.110757158774771],
[-0.186875436782135, 0.44706071961879 , -0.145299823706503],
[-0.25409760441282 , 0.46420381416882 , -0.208177722146526],
[-0.322196658344302, 0.464439136821772, -0.250558059367669],
[-0.385529968662985, 0.402800553948697, -0.310031082540741],
[ 0. , 0.293332603215811, -0.137582088779393],
[ 0. , 0.194828701837823, -0.069158109325951],
[ 0. , 0.103844017393155, -0.009151819844964],
[ 0. , 0. , 0. ],
[ 0.080626352317973, -0.041276068128093, -0.134161035564826],
[ 0.046439347377934, -0.057675223874769, -0.102990627164664],
[ 0. , -0.068753126205604, -0.090545348482397],
[-0.046439347377934, -0.057675223874769, -0.102990627164664],
[-0.080626352317973, -0.041276068128093, -0.134161035564826],
[ 0.315905195966084, 0.298337502555443, -0.285107407636464],
[ 0.275252345439353, 0.312721904921771, -0.244558251170671],
[ 0.176394511553111, 0.311907184376107, -0.219205360345231],
[ 0.131229723798772, 0.284447361805627, -0.234239149487417],
[ 0.184124948330084, 0.260179585304867, -0.226590776513707],
[ 0.279433549294448, 0.267363071770222, -0.248441437111633],
[-0.131229723798772, 0.284447361805627, -0.234239149487417],
[-0.176394511553111, 0.311907184376107, -0.219205360345231],
[-0.275252345439353, 0.312721904921771, -0.244558251170671],
[-0.315905195966084, 0.298337502555443, -0.285107407636464],
[-0.279433549294448, 0.267363071770222, -0.248441437111633],
[-0.184124948330084, 0.260179585304867, -0.226590776513707],
[ 0.121155252430729, -0.208988660580347, -0.160606287940521],
[ 0.041356305910044, -0.194484199722098, -0.096159882202821],
[ 0. , -0.205180167345702, -0.083299217789729],
[-0.041356305910044, -0.194484199722098, -0.096159882202821],
[-0.121155252430729, -0.208988660580347, -0.160606287940521],
[-0.132325402795928, -0.290857984604968, -0.187067868218105],
[-0.064137791831655, -0.325377847425684, -0.158924039726607],
[ 0. , -0.343742581679188, -0.113925986025684],
[ 0.064137791831655, -0.325377847425684, -0.158924039726607],
[ 0.132325402795928, -0.290857984604968, -0.187067868218105],
[ 0.181481567104525, -0.243239316141725, -0.231284988892766],
[ 0.083999507750469, -0.239717753728704, -0.155256465640701],
[ 0. , -0.256058040176369, -0.0950619498899 ],
[-0.083999507750469, -0.239717753728704, -0.155256465640701],
[-0.181481567104525, -0.243239316141725, -0.231284988892766],
[-0.074036069749345, -0.250689938345682, -0.177346470406188],
[ 0. , -0.264945854681568, -0.112349967428413],
[ 0.074036069749345, -0.250689938345682, -0.177346470406188],
# Pupils and eyeball centers
[ 0.257990002632141, 0.276080012321472, -0.219998998939991],
[-0.257990002632141, 0.276080012321472, -0.219998998939991],
[ 0.257990002632141, 0.276080012321472, -0.324570998549461],
[-0.257990002632141, 0.276080012321472, -0.324570998549461]
], np.float32)
self.camera = np.array([[width, 0, width/2], [0, width, height/2], [0, 0, 1]], np.float32)
self.inverse_camera = np.linalg.inv(self.camera)
self.dist_coeffs = np.zeros((4,1))
self.frame_count = 0
self.width = width
self.height = height
self.threshold = threshold
self.detection_threshold = detection_threshold
self.max_faces = max_faces
self.max_threads = max_threads
self.discard = 0
self.discard_after = discard_after
self.detected = 0
self.wait_count = 0
self.scan_every = scan_every
self.bbox_growth = bbox_growth
self.silent = silent
self.try_hard = try_hard
self.res = 224.
self.mean_res = self.mean_224
self.std_res = self.std_224
if model_type < 0:
self.res = 56.
self.mean_res = np.tile(self.mean, [56, 56, 1])
self.std_res = np.tile(self.std, [56, 56, 1])
if model_type < -1:
self.res = 112.
self.mean_res = np.tile(self.mean, [112, 112, 1])
self.std_res = np.tile(self.std, [112, 112, 1])
self.res_i = int(self.res)
self.out_res = 27.
if model_type < 0:
self.out_res = 6.
if model_type < -1:
self.out_res = 13.
self.out_res_i = int(self.out_res) + 1
self.logit_factor = 16.
if model_type < 0:
self.logit_factor = 8.
if model_type < -1:
self.logit_factor = 16.
self.no_gaze = no_gaze
self.debug_gaze = False
self.feature_level = feature_level
if model_type == -1:
self.feature_level = min(feature_level, 1)
self.max_feature_updates = max_feature_updates
self.static_model = static_model
self.face_info = [FaceInfo(id, self) for id in range(max_faces)]
self.fail_count = 0
def detect_faces(self, frame):
im = cv2.resize(frame, (224, 224), interpolation=cv2.INTER_LINEAR)[:,:,::-1] * self.std_224 + self.mean_224
im = np.expand_dims(im, 0)
im = np.transpose(im, (0,3,1,2))
outputs, maxpool = self.detection.run([], {'input': im})
outputs = np.array(outputs)
maxpool = np.array(maxpool)
outputs[0, 0, outputs[0, 0] != maxpool[0, 0]] = 0
detections = np.flip(np.argsort(outputs[0,0].flatten()))
results = []
for det in detections[0:self.max_faces]:
y, x = det // 56, det % 56
c = outputs[0, 0, y, x]
r = outputs[0, 1, y, x] * 112.
x *= 4
y *= 4
r *= 1.0
if c < self.detection_threshold:
break
results.append((x - r, y - r, 2 * r, 2 * r * 1.0))
results = np.array(results).astype(np.float32)
if results.shape[0] > 0:
results[:, [0,2]] *= frame.shape[1] / 224.
results[:, [1,3]] *= frame.shape[0] / 224.
return results
def landmarks(self, tensor, crop_info):
crop_x1, crop_y1, scale_x, scale_y, _ = crop_info
avg_conf = 0
res = self.res - 1
c0, c1, c2 = 66, 132, 198
if self.model_type == -1:
c0, c1, c2 = 30, 60, 90
t_main = tensor[0:c0].reshape((c0,self.out_res_i * self.out_res_i))
t_m = t_main.argmax(1)
indices = np.expand_dims(t_m, 1)
t_conf = np.take_along_axis(t_main, indices, 1).reshape((c0,))
t_off_x = np.take_along_axis(tensor[c0:c1].reshape((c0,self.out_res_i * self.out_res_i)), indices, 1).reshape((c0,))
t_off_y = np.take_along_axis(tensor[c1:c2].reshape((c0,self.out_res_i * self.out_res_i)), indices, 1).reshape((c0,))
t_off_x = res * logit_arr(t_off_x, self.logit_factor)
t_off_y = res * logit_arr(t_off_y, self.logit_factor)
t_x = crop_y1 + scale_y * (res * np.floor(t_m / self.out_res_i) / self.out_res + t_off_x)
t_y = crop_x1 + scale_x * (res * np.floor(np.mod(t_m, self.out_res_i)) / self.out_res + t_off_y)
avg_conf = np.average(t_conf)
lms = np.stack([t_x, t_y, t_conf], 1)
lms[np.isnan(lms).any(axis=1)] = np.array([0.,0.,0.], dtype=np.float32)
if self.model_type == -1:
lms = lms[[0,0,1,1,1,2,2,2,3,3,3,4,4,4,5,5,6,7,7,8,8,9,10,10,11,11,12,21,21,21,22,23,23,23,23,23,13,14,14,15,16,16,17,18,18,19,20,20,24,25,25,25,26,26,27,27,27,24,24,28,28,28,26,29,29,29]]
#lms[[1,3,4,6,7,9,10,12,13,15,18,20,23,25,38,40,44,46]] += lms[[2,2,5,5,8,8,11,11,14,16,19,21,24,26,39,39,45,45]]
#lms[[3,4,6,7,9,10,12,13]] += lms[[5,5,8,8,11,11,14,14]]
#lms[[1,15,18,20,23,25,38,40,44,46]] /= 2.0
#lms[[3,4,6,7,9,10,12,13]] /= 3.0
part_avg = np.mean(np.partition(lms[:,2],3)[0:3])
if part_avg < 0.65:
avg_conf = part_avg
return (avg_conf, np.array(lms))
def estimate_depth(self, face_info):
lms = np.concatenate((face_info.lms, np.array([[face_info.eye_state[0][1], face_info.eye_state[0][2], face_info.eye_state[0][3]], [face_info.eye_state[1][1], face_info.eye_state[1][2], face_info.eye_state[1][3]]], np.float32)), 0)
image_pts = np.array(lms)[face_info.contour_pts, 0:2]
success = False
if not face_info.rotation is None:
success, face_info.rotation, face_info.translation = cv2.solvePnP(face_info.contour, image_pts, self.camera, self.dist_coeffs, useExtrinsicGuess=True, rvec=np.transpose(face_info.rotation), tvec=np.transpose(face_info.translation), flags=cv2.SOLVEPNP_ITERATIVE)
else:
rvec = np.array([0, 0, 0], np.float32)
tvec = np.array([0, 0, 0], np.float32)
success, face_info.rotation, face_info.translation = cv2.solvePnP(face_info.contour, image_pts, self.camera, self.dist_coeffs, useExtrinsicGuess=True, rvec=rvec, tvec=tvec, flags=cv2.SOLVEPNP_ITERATIVE)
rotation = face_info.rotation
translation = face_info.translation
pts_3d = np.zeros((70,3), np.float32)
if not success:
face_info.rotation = np.array([0.0, 0.0, 0.0], np.float32)
face_info.translation = np.array([0.0, 0.0, 0.0], np.float32)
return False, np.zeros(4), np.zeros(3), 99999., pts_3d, lms
else:
face_info.rotation = np.transpose(face_info.rotation)
face_info.translation = np.transpose(face_info.translation)
rmat, _ = cv2.Rodrigues(rotation)
inverse_rotation = np.linalg.inv(rmat)
t_reference = face_info.face_3d.dot(rmat.transpose())
t_reference = t_reference + face_info.translation
t_reference = t_reference.dot(self.camera.transpose())
t_depth = t_reference[:, 2]
t_depth[t_depth == 0] = 0.000001
t_depth_e = np.expand_dims(t_depth[:],1)
t_reference = t_reference[:] / t_depth_e
pts_3d[0:66] = np.stack([lms[0:66,0], lms[0:66,1], np.ones((66,))], 1) * t_depth_e[0:66]
pts_3d[0:66] = (pts_3d[0:66].dot(self.inverse_camera.transpose()) - face_info.translation).dot(inverse_rotation.transpose())
pnp_error = np.power(lms[0:17,0:2] - t_reference[0:17,0:2], 2).sum()
pnp_error += np.power(lms[30,0:2] - t_reference[30,0:2], 2).sum()
if np.isnan(pnp_error):
pnp_error = 9999999.
for i, pt in enumerate(face_info.face_3d[66:70]):
if i == 2:
# Right eyeball
# Eyeballs have an average diameter of 12.5mm and and the distance between eye corners is 30-35mm, so a conversion factor of 0.385 can be applied
eye_center = (pts_3d[36] + pts_3d[39]) / 2.0
d_corner = np.linalg.norm(pts_3d[36] - pts_3d[39])
depth = 0.385 * d_corner
pt_3d = np.array([eye_center[0], eye_center[1], eye_center[2] - depth])
pts_3d[68] = pt_3d
continue
if i == 3:
# Left eyeball
eye_center = (pts_3d[42] + pts_3d[45]) / 2.0
d_corner = np.linalg.norm(pts_3d[42] - pts_3d[45])
depth = 0.385 * d_corner
pt_3d = np.array([eye_center[0], eye_center[1], eye_center[2] - depth])
pts_3d[69] = pt_3d
continue
if i == 0:
d1 = np.linalg.norm(lms[66,0:2] - lms[36,0:2])
d2 = np.linalg.norm(lms[66,0:2] - lms[39,0:2])
d = d1 + d2
pt = (pts_3d[36] * d1 + pts_3d[39] * d2) / d
if i == 1:
d1 = np.linalg.norm(lms[67,0:2] - lms[42,0:2])
d2 = np.linalg.norm(lms[67,0:2] - lms[45,0:2])
d = d1 + d2
pt = (pts_3d[42] * d1 + pts_3d[45] * d2) / d
if i < 2:
reference = rmat.dot(pt)
reference = reference + face_info.translation
reference = self.camera.dot(reference)
depth = reference[2]
pt_3d = np.array([lms[66+i][0] * depth, lms[66+i][1] * depth, depth], np.float32)
pt_3d = self.inverse_camera.dot(pt_3d)
pt_3d = pt_3d - face_info.translation
pt_3d = inverse_rotation.dot(pt_3d)
pts_3d[66+i,:] = pt_3d[:]
pts_3d[np.isnan(pts_3d).any(axis=1)] = np.array([0.,0.,0.], dtype=np.float32)
pnp_error = np.sqrt(pnp_error / (2.0 * image_pts.shape[0]))
if pnp_error > 300:
face_info.fail_count += 1
if face_info.fail_count > 5:
# Something went wrong with adjusting the 3D model
if not self.silent:
print(f"Detected anomaly when 3D fitting face {face_info.id}. Resetting.")
face_info.face_3d = copy.copy(self.face_3d)
face_info.rotation = None
face_info.translation = np.array([0.0, 0.0, 0.0], np.float32)
face_info.update_counts = np.zeros((66,2))
face_info.update_contour()
else:
face_info.fail_count = 0
euler = cv2.RQDecomp3x3(rmat)[0]
return True, matrix_to_quaternion(rmat), euler, pnp_error, pts_3d, lms
def preprocess(self, im, crop):
x1, y1, x2, y2 = crop
im = np.float32(im[y1:y2, x1:x2])
im = cv2.resize(im, (self.res_i, self.res_i), interpolation=cv2.INTER_LINEAR) * self.std_res + self.mean_res
im = np.expand_dims(im, 0)
im = np.transpose(im, (0,3,1,2))
return im
def equalize(self, im):
im_yuv = cv2.cvtColor(im, cv2.COLOR_BGR2YUV)
im_yuv[:,:,0] = cv2.equalizeHist(im_yuv[:,:,0])
return cv2.cvtColor(im_yuv, cv2.COLOR_YUV2BGR)
def corners_to_eye(self, corners, w, h, flip):
((cx1, cy1), (cx2, cy2)) = corners
c1 = np.array([cx1, cy1])
c2 = np.array([cx2, cy2])
c2, a = compensate(c1, c2)
center = (c1 + c2) / 2.0
radius = np.linalg.norm(c1 - c2) / 2.0
radius = np.array([radius * 1.4, radius * 1.2])
upper_left = clamp_to_im(center - radius, w, h)
lower_right = clamp_to_im(center + radius, w, h)
return upper_left, lower_right, center, radius, c1, a
def prepare_eye(self, frame, full_frame, lms, flip):
outer_pt = tuple(lms[0])
inner_pt = tuple(lms[1])
h, w, _ = frame.shape
(x1, y1), (x2, y2), center, radius, reference, a = self.corners_to_eye((outer_pt, inner_pt), w, h, flip)
im = rotate_image(frame[:, :, ::], a, reference)
im = im[int(y1):int(y2), int(x1):int(x2),:]
if np.prod(im.shape) < 1:
return None, None, None, None, None, None
if flip:
im = cv2.flip(im, 1)
scale = np.array([(x2 - x1), (y2 - y1)]) / 32.
im = cv2.resize(im, (32, 32), interpolation=cv2.INTER_LINEAR)
#im = self.equalize(im)
if self.debug_gaze:
if not flip:
full_frame[0:32, 0:32] = im
else:
full_frame[0:32, 32:64] = im
im = im.astype(np.float32)[:,:,::-1] * self.std_32 + self.mean_32
im = np.expand_dims(im, 0)
im = np.transpose(im, (0,3,2,1))
return im, x1, y1, scale, reference, a
def extract_face(self, frame, lms):
lms = np.array(lms)[:,0:2][:,::-1]
x1, y1 = tuple(lms.min(0))
x2, y2 = tuple(lms.max(0))
radius_x = 1.2 * (x2 - x1) / 2.0
radius_y = 1.2 * (y2 - y1) / 2.0
radius = np.array((radius_x, radius_y))
center = (np.array((x1, y1)) + np.array((x2, y2))) / 2.0
w, h, _ = frame.shape
x1, y1 = clamp_to_im(center - radius, h, w)
x2, y2 = clamp_to_im(center + radius + 1, h, w)
offset = np.array((x1, y1))
lms = (lms[:, 0:2] - offset).astype(np.int)
frame = frame[y1:y2, x1:x2]
return frame, lms, offset
def get_eye_state(self, frame, lms, single=False):
if self.no_gaze:
return [(1.0, 0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0)]
lms = np.array(lms)
e_x = [0,0]
e_y = [0,0]
scale = [0,0]
reference = [None, None]
angles = [0, 0]
face_frame, lms, offset = self.extract_face(frame, lms)
(right_eye, e_x[0], e_y[0], scale[0], reference[0], angles[0]) = self.prepare_eye(face_frame, frame, np.array([lms[36,0:2], lms[39,0:2]]), False)
(left_eye, e_x[1], e_y[1], scale[1], reference[1], angles[1]) = self.prepare_eye(face_frame, frame, np.array([lms[42,0:2], lms[45,0:2]]), True)
if right_eye is None or left_eye is None:
return [(1.0, 0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0)]
both_eyes = np.concatenate((right_eye, left_eye))
results = None
if single:
results = self.gaze_model_single.run([], {self.input_name: both_eyes})
else:
results = self.gaze_model.run([], {self.input_name: both_eyes})
open = [0, 0]
open[0] = 1#results[1][0].argmax()
open[1] = 1#results[1][1].argmax()
results = np.array(results[0])
eye_state = []
for i in range(2):
m = int(results[i][0].argmax())
x = m // 8
y = m % 8
conf = float(results[i][0][x,y])
off_x = 32.0 * logit(results[i][1][x, y], 8.0)
off_y = 32.0 * logit(results[i][2][x, y], 8.0)
if i == 1:
eye_x = 32.0 * float(x) / 8.0 + off_x
else:
eye_x = 32.0 * float(x) / 8.0 + off_x
eye_y = 32.0 * float(y) / 8.0 + off_y
if self.debug_gaze:
if i == 0:
frame[int(eye_y), int(eye_x)] = (0, 0, 255)
frame[int(eye_y+1), int(eye_x)] = (0, 0, 255)
frame[int(eye_y+1), int(eye_x+1)] = (0, 0, 255)
frame[int(eye_y), int(eye_x+1)] = (0, 0, 255)
else:
frame[int(eye_y), 32+int(eye_x)] = (0, 0, 255)
frame[int(eye_y+1), 32+int(eye_x)] = (0, 0, 255)
frame[int(eye_y+1), 32+int(eye_x+1)] = (0, 0, 255)
frame[int(eye_y), 32+int(eye_x+1)] = (0, 0, 255)
if i == 0:
eye_x = e_x[i] + scale[i][0] * eye_x
else:
eye_x = e_x[i] + scale[i][0] * (32. - eye_x)
eye_y = e_y[i] + scale[i][1] * eye_y
eye_x, eye_y = rotate(reference[i], (eye_x, eye_y), -angles[i])
eye_x = eye_x + offset[0]
eye_y = eye_y + offset[1]
eye_state.append([open[i], eye_y, eye_x, conf])
eye_state = np.array(eye_state)
eye_state[np.isnan(eye_state).any(axis=1)] = np.array([1.,0.,0.,0.], dtype=np.float32)
return eye_state
def assign_face_info(self, results):
if self.max_faces == 1 and len(results) == 1:
conf, (lms, eye_state), conf_adjust = results[0]
self.face_info[0].update((conf - conf_adjust, (lms, eye_state)), np.array(lms)[:, 0:2].mean(0), self.frame_count)
return
result_coords = []
adjusted_results = []
for conf, (lms, eye_state), conf_adjust in results:
adjusted_results.append((conf - conf_adjust, (lms, eye_state)))
result_coords.append(np.array(lms)[:, 0:2].mean(0))
results = adjusted_results
candidates = [[]] * self.max_faces
max_dist = 2 * np.linalg.norm(np.array([self.width, self.height]))
for i, face_info in enumerate(self.face_info):
for j, coord in enumerate(result_coords):
if face_info.coord is None:
candidates[i].append((max_dist, i, j))
else:
candidates[i].append((np.linalg.norm(face_info.coord - coord), i, j))
for i, candidate in enumerate(candidates):
candidates[i] = sorted(candidate)
found = 0
target = len(results)
used_results = {}
used_faces = {}
while found < target:
min_list = min(candidates)
candidate = min_list.pop(0)
face_idx = candidate[1]
result_idx = candidate[2]
if not result_idx in used_results and not face_idx in used_faces:
self.face_info[face_idx].update(results[result_idx], result_coords[result_idx], self.frame_count)
min_list.clear()
used_results[result_idx] = True
used_faces[face_idx] = True
found += 1
if len(min_list) == 0:
min_list.append((2 * max_dist, face_idx, result_idx))
for face_info in self.face_info:
if face_info.frame_count != self.frame_count:
face_info.update(None, None, self.frame_count)
def predict(self, frame, additional_faces=[]):
self.frame_count += 1
start = time.perf_counter()
im = frame
duration_fd = 0.0
duration_pp = 0.0
duration_model = 0.0
duration_pnp = 0.0
new_faces = []
new_faces.extend(self.faces)
bonus_cutoff = len(self.faces)
new_faces.extend(additional_faces)
self.wait_count += 1
if self.detected == 0:
start_fd = time.perf_counter()
if self.use_retinaface > 0 or self.try_hard:
retinaface_detections = self.retinaface.detect_retina(frame)
new_faces.extend(retinaface_detections)
if self.use_retinaface == 0 or self.try_hard:
new_faces.extend(self.detect_faces(frame))
if self.try_hard:
new_faces.extend([(0, 0, self.width, self.height)])
duration_fd = 1000 * (time.perf_counter() - start_fd)
self.wait_count = 0
elif self.detected < self.max_faces:
if self.use_retinaface > 0:
new_faces.extend(self.retinaface_scan.get_results())
if self.wait_count >= self.scan_every:
if self.use_retinaface > 0:
self.retinaface_scan.background_detect(frame)
else:
start_fd = time.perf_counter()
new_faces.extend(self.detect_faces(frame))
duration_fd = 1000 * (time.perf_counter() - start_fd)
self.wait_count = 0
else:
self.wait_count = 0
if len(new_faces) < 1:
duration = (time.perf_counter() - start) * 1000
if not self.silent:
print(f"Took {duration:.2f}ms")
return []
crops = []
crop_info = []
num_crops = 0
for j, (x,y,w,h) in enumerate(new_faces):
crop_x1 = x - int(w * 0.1)
crop_y1 = y - int(h * 0.125)
crop_x2 = x + w + int(w * 0.1)
crop_y2 = y + h + int(h * 0.125)
crop_x1, crop_y1 = clamp_to_im((crop_x1, crop_y1), self.width, self.height)
crop_x2, crop_y2 = clamp_to_im((crop_x2, crop_y2), self.width, self.height)
scale_x = float(crop_x2 - crop_x1) / self.res
scale_y = float(crop_y2 - crop_y1) / self.res
if crop_x2 - crop_x1 < 4 or crop_y2 - crop_y1 < 4:
continue
start_pp = time.perf_counter()
crop = self.preprocess(im, (crop_x1, crop_y1, crop_x2, crop_y2))
duration_pp += 1000 * (time.perf_counter() - start_pp)
crops.append(crop)
crop_info.append((crop_x1, crop_y1, scale_x, scale_y, 0.0 if j >= bonus_cutoff else 0.1))
num_crops += 1
start_model = time.perf_counter()
outputs = {}
if num_crops == 1:
output = self.session.run([], {self.input_name: crops[0]})[0]
conf, lms = self.landmarks(output[0], crop_info[0])
if conf > self.threshold:
try:
eye_state = self.get_eye_state(frame, lms)
except:
eye_state = [(1.0, 0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0)]
outputs[crop_info[0]] = (conf, (lms, eye_state), 0)
else:
started = 0
results = queue.Queue()
for i in range(min(num_crops, self.max_workers)):
thread = threading.Thread(target=worker_thread, args=(self.sessions[started], frame, crops[started], crop_info[started], results, self.input_name, started, self))
started += 1
thread.start()
returned = 0
while returned < num_crops:
result = results.get(True)
if len(result) != 1:
session, conf, lms, sample_crop_info, idx = result
outputs[sample_crop_info] = (conf, lms, idx)
else:
session = result[0]
returned += 1
if started < num_crops:
thread = threading.Thread(target=worker_thread, args=(session, frame, crops[started], crop_info[started], results, self.input_name, started, self))
started += 1
thread.start()
actual_faces = []
good_crops = []
for crop in crop_info:
if crop not in outputs:
continue
conf, lms, i = outputs[crop]
x1, y1, _ = lms[0].min(0)
x2, y2, _ = lms[0].max(0)
bb = (x1, y1, x2 - x1, y2 - y1)
outputs[crop] = (conf, lms, i, bb)
actual_faces.append(bb)
good_crops.append(crop)
groups = group_rects(actual_faces)
best_results = {}
for crop in good_crops:
conf, lms, i, bb = outputs[crop]
if conf < self.threshold:
continue;
group_id = groups[str(bb)][0]
if not group_id in best_results:
best_results[group_id] = [-1, [], 0]
if conf > self.threshold and best_results[group_id][0] < conf + crop[4]:
best_results[group_id][0] = conf + crop[4]
best_results[group_id][1] = lms
best_results[group_id][2] = crop[4]
sorted_results = sorted(best_results.values(), key=lambda x: x[0], reverse=True)[:self.max_faces]
self.assign_face_info(sorted_results)
duration_model = 1000 * (time.perf_counter() - start_model)
results = []
detected = []
start_pnp = time.perf_counter()
for face_info in self.face_info:
results.append(face_info)
if face_info.alive and face_info.conf > self.threshold:
face_info.success, face_info.quaternion, face_info.euler, face_info.pnp_error, face_info.pts_3d, face_info.lms = self.estimate_depth(face_info)
face_info.adjust_3d()
lms = face_info.lms[:, 0:2]
x1, y1 = tuple(lms[0:66].min(0))
x2, y2 = tuple(lms[0:66].max(0))
bbox = (y1, x1, y2 - y1, x2 - x1)
face_info.bbox = bbox
detected.append(bbox)
duration_pnp += 1000 * (time.perf_counter() - start_pnp)
if len(detected) > 0:
self.detected = len(detected)
self.faces = detected
self.discard = 0
else:
self.detected = 0
self.discard += 1
if self.discard > self.discard_after:
self.faces = []
else:
if self.bbox_growth > 0:
faces = []
for (x,y,w,h) in self.faces:
x -= w * self.bbox_growth
y -= h * self.bbox_growth
w += 2 * w * self.bbox_growth
h += 2 * h * self.bbox_growth
faces.append((x,y,w,h))
self.faces = faces
self.faces = [x for x in self.faces if not np.isnan(np.array(x)).any()]
self.detected = len(self.faces)
duration = (time.perf_counter() - start) * 1000
if not self.silent:
print(f"Took {duration:.2f}ms (detect: {duration_fd:.2f}ms, crop: {duration_pp:.2f}ms, track: {duration_model:.2f}ms, 3D points: {duration_pnp:.2f}ms)")
results = sorted(results, key=lambda x: x.id)
return results