| import os |
| import random |
| import time |
| from typing import List, Dict, Any, Optional |
|
|
| import cv2 |
| import numpy as np |
|
|
| import config |
| from config import logger, MODELS_PATH, OUTPUT_DIR, DEEPFACE_AVAILABLE, \ |
| YOLO_AVAILABLE |
| from facial_analyzer import FacialFeatureAnalyzer |
| from models import ModelType |
| from utils import save_image_high_quality, get_date_subfolder |
|
|
| if DEEPFACE_AVAILABLE: |
| from deepface import DeepFace |
|
|
| |
| if YOLO_AVAILABLE: |
| try: |
| from ultralytics import YOLO |
|
|
| YOLO_AVAILABLE = True |
| except ImportError: |
| YOLO_AVAILABLE = False |
| YOLO = None |
| print("Warning: ENABLE_YOLO=true but ultralytics not available") |
|
|
|
|
| class EnhancedFaceAnalyzer: |
| """增强版人脸分析器 - 支持混合模型""" |
|
|
| def __init__(self, models_dir: str = MODELS_PATH): |
| """ |
| 初始化人脸分析器 |
| :param models_dir: 模型文件目录 |
| """ |
| start_time = time.perf_counter() |
| self.models_dir = models_dir |
| self.MODEL_MEAN_VALUES = (104, 117, 123) |
| self.age_list = [ |
| "(0-2)", |
| "(4-6)", |
| "(8-12)", |
| "(15-20)", |
| "(25-32)", |
| "(38-43)", |
| "(48-53)", |
| "(60-100)", |
| ] |
| self.gender_list = ["Male", "Female"] |
| |
| self.gender_colors = { |
| "Male": (255, 165, 0), |
| "Female": (255, 0, 255), |
| } |
|
|
| |
| self.facial_analyzer = FacialFeatureAnalyzer() |
| |
| self._load_howcuteami_models() |
| |
| self._load_yolo_model() |
|
|
| |
| if getattr(config, "ENABLE_WARMUP", False): |
| self._warmup_models() |
|
|
| init_time = time.perf_counter() - start_time |
| logger.info(f"EnhancedFaceAnalyzer initialized successfully, time: {init_time:.3f}s") |
|
|
| def _cap_conf(self, value: float) -> float: |
| """将置信度限制在 [0, 0.9999] 并保留4位小数。""" |
| try: |
| v = float(value if value is not None else 0.0) |
| except Exception: |
| v = 0.0 |
| if v >= 1.0: |
| v = 0.9999 |
| if v < 0.0: |
| v = 0.0 |
| return round(v, 4) |
|
|
| def _adjust_beauty_score(self, score: float) -> float: |
| try: |
| if not config.BEAUTY_ADJUST_ENABLED: |
| return score |
| |
| low = float(getattr(config, "BEAUTY_ADJUST_MIN", 6.0)) |
| high = float(getattr(config, "BEAUTY_ADJUST_MAX", getattr(config, "BEAUTY_ADJUST_THRESHOLD", 8.0))) |
| gamma = float(getattr(config, "BEAUTY_ADJUST_GAMMA", 0.3)) |
| gamma = max(0.0001, min(1.0, gamma)) |
|
|
| |
| if not (0.0 <= low < high <= 10.0): |
| return score |
|
|
| |
| if score < low: |
| return score |
| if score < high: |
| |
| adjusted = high - gamma * (high - score) |
| adjusted = round(min(10.0, max(0.0, adjusted)), 1) |
| try: |
| logger.info( |
| f"beauty_score adjusted: original={score:.1f} -> adjusted={adjusted:.1f} " |
| f"(range=[{low:.1f},{high:.1f}], gamma={gamma:.3f})" |
| ) |
| except Exception: |
| pass |
| return adjusted |
| return score |
| except Exception: |
| return score |
|
|
| def _load_yolo_model(self): |
| """加载YOLOv人脸检测模型""" |
| self.yolo_model = None |
| if config.YOLO_AVAILABLE: |
| try: |
| |
| yolo_face_path = os.path.join(self.models_dir, config.YOLO_MODEL) |
|
|
| if os.path.exists(yolo_face_path): |
| self.yolo_model = YOLO(yolo_face_path) |
| logger.info(f"Local YOLO face model loaded successfully: {yolo_face_path}") |
| else: |
| |
| logger.info("Local YOLO face model does not exist, attempting to download...") |
| try: |
| |
| model_name = "yolov11n-face.pt" |
| self.yolo_model = YOLO(model_name) |
| logger.info( |
| f"YOLOv8 general model loaded successfully (detecting 'person' class as face regions)" |
| ) |
| except Exception as e: |
| logger.warning(f"YOLOv model download failed: {e}") |
|
|
| except Exception as e: |
| logger.error(f"YOLOv model loading failed: {e}") |
| else: |
| logger.warning("ultralytics not installed, cannot use YOLOv") |
|
|
| def _load_howcuteami_models(self): |
| """加载HowCuteAmI深度学习模型""" |
| try: |
| |
| face_proto = os.path.join(self.models_dir, "opencv_face_detector.pbtxt") |
| face_model = os.path.join(self.models_dir, "opencv_face_detector_uint8.pb") |
| self.face_net = cv2.dnn.readNet(face_model, face_proto) |
|
|
| |
| age_proto = os.path.join(self.models_dir, "age_googlenet.prototxt") |
| age_model = os.path.join(self.models_dir, "age_googlenet.caffemodel") |
| self.age_net = cv2.dnn.readNet(age_model, age_proto) |
|
|
| |
| gender_proto = os.path.join(self.models_dir, "gender_googlenet.prototxt") |
| gender_model = os.path.join(self.models_dir, "gender_googlenet.caffemodel") |
| self.gender_net = cv2.dnn.readNet(gender_model, gender_proto) |
|
|
| |
| beauty_proto = os.path.join(self.models_dir, "beauty_resnet.prototxt") |
| beauty_model = os.path.join(self.models_dir, "beauty_resnet.caffemodel") |
| self.beauty_net = cv2.dnn.readNet(beauty_model, beauty_proto) |
|
|
| logger.info("HowCuteAmI model loaded successfully!") |
|
|
| except Exception as e: |
| logger.error(f"HowCuteAmI model loading failed: {e}") |
| raise e |
|
|
| |
| def _detect_faces( |
| self, frame: np.ndarray, conf_threshold: float = config.FACE_CONFIDENCE |
| ) -> List[List[int]]: |
| """ |
| 使用YOLO进行人脸检测,如果失败则回退到OpenCV DNN |
| """ |
| |
| face_boxes = [] |
| if self.yolo_model is not None: |
| try: |
| results = self.yolo_model(frame, conf=conf_threshold, verbose=False) |
| for result in results: |
| boxes = result.boxes |
| if boxes is not None: |
| for box in boxes: |
| |
| class_id = int(box.cls[0]) |
| |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int) |
| confidence = float(box.conf[0]) |
| logger.info( |
| f"detect class_id={class_id}, confidence={confidence}" |
| ) |
| |
| frame_height, frame_width = frame.shape[:2] |
| x1 = max(0, int(x1)) |
| y1 = max(0, int(y1)) |
| x2 = min(frame_width, int(x2)) |
| y2 = min(frame_height, int(y2)) |
|
|
| |
| width, height = x2 - x1, y2 - y1 |
| if ( |
| width > 30 and height > 30 |
| ): |
| |
| if self._is_likely_face_region(x1, y1, x2, y2, frame): |
| face_boxes.append(self._scale_box([x1, y1, x2, y2])) |
| logger.info( |
| f"YOLO detected {len(face_boxes)} faces, conf_threshold={conf_threshold}" |
| ) |
| if face_boxes: |
| return face_boxes |
|
|
| except Exception as e: |
| logger.warning(f"YOLO detection failed, falling back to OpenCV DNN: {e}") |
| return self._detect_faces_opencv_fallback(frame, conf_threshold) |
|
|
| return face_boxes |
|
|
| def _is_likely_face_region( |
| self, x1: int, y1: int, x2: int, y2: int, frame: np.ndarray |
| ) -> bool: |
| """ |
| 判断检测区域是否可能是人脸区域(当使用通用YOLO模型时) |
| """ |
| width, height = x2 - x1, y2 - y1 |
|
|
| |
| aspect_ratio = width / height |
| if not (0.6 <= aspect_ratio <= 1.6): |
| return False |
|
|
| |
| frame_height = frame.shape[0] |
| center_y = (y1 + y2) / 2 |
| if center_y > frame_height * 0.8: |
| return False |
|
|
| |
| frame_width, frame_height = frame.shape[1], frame.shape[0] |
| if width > frame_width * 0.8 or height > frame_height * 0.8: |
| return False |
|
|
| return True |
|
|
| def _detect_faces_opencv_fallback( |
| self, frame: np.ndarray, conf_threshold: float = 0.5 |
| ) -> List[List[int]]: |
| """ |
| 优化版人脸检测 - 支持多尺度检测和小人脸识别 |
| """ |
| frame_height, frame_width = frame.shape[:2] |
| all_boxes = [] |
|
|
| |
| detection_configs = [ |
| {"size": (300, 300), "threshold": conf_threshold}, |
| { |
| "size": (416, 416), |
| "threshold": max(0.3, conf_threshold - 0.2), |
| }, |
| { |
| "size": (512, 512), |
| "threshold": max(0.25, conf_threshold - 0.25), |
| }, |
| ] |
| logger.info(f"Detecting faces using opencv, conf_threshold={conf_threshold}") |
| for config in detection_configs: |
| try: |
| |
| processed_frame = cv2.convertScaleAbs(frame, alpha=1.1, beta=10) |
|
|
| blob = cv2.dnn.blobFromImage( |
| processed_frame, 1.0, config["size"], [104, 117, 123], True, False |
| ) |
| self.face_net.setInput(blob) |
| detections = self.face_net.forward() |
|
|
| |
| for i in range(detections.shape[2]): |
| confidence = detections[0, 0, i, 2] |
| if confidence > config["threshold"]: |
| x1 = int(detections[0, 0, i, 3] * frame_width) |
| y1 = int(detections[0, 0, i, 4] * frame_height) |
| x2 = int(detections[0, 0, i, 5] * frame_width) |
| y2 = int(detections[0, 0, i, 6] * frame_height) |
|
|
| |
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(frame_width, x2), min(frame_height, y2) |
|
|
| |
| width, height = x2 - x1, y2 - y1 |
| if ( |
| width > 20 |
| and height > 20 |
| and width < frame_width * 0.8 |
| and height < frame_height * 0.8 |
| ): |
| |
| aspect_ratio = width / height |
| if 0.6 <= aspect_ratio <= 1.8: |
| all_boxes.append( |
| { |
| "box": [x1, y1, x2, y2], |
| "confidence": confidence, |
| "area": width * height, |
| } |
| ) |
| except Exception as e: |
| logger.warning(f"Scale {config['size']} detection failed: {e}") |
| continue |
|
|
| |
| if not all_boxes: |
| logger.info("No faces detected, trying more relaxed detection conditions...") |
| try: |
| |
| enhanced_frame = cv2.equalizeHist( |
| cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| ) |
| enhanced_frame = cv2.cvtColor(enhanced_frame, cv2.COLOR_GRAY2BGR) |
|
|
| blob = cv2.dnn.blobFromImage( |
| enhanced_frame, 1.0, (300, 300), [104, 117, 123], True, False |
| ) |
| self.face_net.setInput(blob) |
| detections = self.face_net.forward() |
|
|
| for i in range(detections.shape[2]): |
| confidence = detections[0, 0, i, 2] |
| if confidence > 0.15: |
| x1 = int(detections[0, 0, i, 3] * frame_width) |
| y1 = int(detections[0, 0, i, 4] * frame_height) |
| x2 = int(detections[0, 0, i, 5] * frame_width) |
| y2 = int(detections[0, 0, i, 6] * frame_height) |
|
|
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(frame_width, x2), min(frame_height, y2) |
|
|
| width, height = x2 - x1, y2 - y1 |
| if width > 15 and height > 15: |
| aspect_ratio = width / height |
| if 0.5 <= aspect_ratio <= 2.0: |
| all_boxes.append( |
| { |
| "box": [x1, y1, x2, y2], |
| "confidence": confidence, |
| "area": width * height, |
| } |
| ) |
| except Exception as e: |
| logger.warning(f"Relaxed condition detection also failed: {e}") |
|
|
| |
| if all_boxes: |
| final_boxes = self._apply_nms(all_boxes, overlap_threshold=0.4) |
| return [self._scale_box(box["box"]) for box in final_boxes] |
|
|
| return [] |
|
|
| def _apply_nms( |
| self, detections: List[Dict], overlap_threshold: float = 0.4 |
| ) -> List[Dict]: |
| """ |
| 非极大值抑制,去除重复的检测框 |
| """ |
| if not detections: |
| return [] |
|
|
| |
| detections.sort(key=lambda x: x["confidence"], reverse=True) |
|
|
| keep = [] |
| while detections: |
| |
| best = detections.pop(0) |
| keep.append(best) |
|
|
| |
| remaining = [] |
| for det in detections: |
| if self._calculate_iou(best["box"], det["box"]) < overlap_threshold: |
| remaining.append(det) |
| detections = remaining |
|
|
| return keep |
|
|
| def _calculate_iou(self, box1: List[int], box2: List[int]) -> float: |
| """ |
| 计算两个边界框的IoU (交并比) |
| """ |
| x1_1, y1_1, x2_1, y2_1 = box1 |
| x1_2, y1_2, x2_2, y2_2 = box2 |
|
|
| |
| x1_i = max(x1_1, x1_2) |
| y1_i = max(y1_1, y1_2) |
| x2_i = min(x2_1, x2_2) |
| y2_i = min(y2_1, y2_2) |
|
|
| if x2_i <= x1_i or y2_i <= y1_i: |
| return 0.0 |
|
|
| intersection = (x2_i - x1_i) * (y2_i - y1_i) |
|
|
| |
| area1 = (x2_1 - x1_1) * (y2_1 - y1_1) |
| area2 = (x2_2 - x1_2) * (y2_2 - y1_2) |
| union = area1 + area2 - intersection |
|
|
| return intersection / union if union > 0 else 0.0 |
|
|
| def _scale_box(self, box: List[int]) -> List[int]: |
| """将矩形框缩放为正方形""" |
| width = box[2] - box[0] |
| height = box[3] - box[1] |
| maximum = max(width, height) |
| dx = int((maximum - width) / 2) |
| dy = int((maximum - height) / 2) |
|
|
| return [box[0] - dx, box[1] - dy, box[2] + dx, box[3] + dy] |
|
|
| def _crop_face(self, image: np.ndarray, box: List[int]) -> np.ndarray: |
| """裁剪人脸区域""" |
| x1, y1, x2, y2 = box |
| h, w = image.shape[:2] |
| x1 = max(0, x1) |
| y1 = max(0, y1) |
| x2 = min(w, x2) |
| y2 = min(h, y2) |
| return image[y1:y2, x1:x2] |
|
|
| def _predict_beauty_gender_with_howcuteami( |
| self, face: np.ndarray |
| ) -> Dict[str, Any]: |
| """使用HowCuteAmI模型预测颜值和性别""" |
| try: |
| blob = cv2.dnn.blobFromImage( |
| face, 1.0, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False |
| ) |
|
|
| |
| self.gender_net.setInput(blob) |
| gender_preds = self.gender_net.forward() |
| gender = self.gender_list[gender_preds[0].argmax()] |
| gender_confidence = float(np.max(gender_preds[0])) |
| gender_confidence = self._cap_conf(gender_confidence) |
| |
| self.age_net.setInput(blob) |
| age_preds = self.age_net.forward() |
| age = self.age_list[age_preds[0].argmax()] |
| age_confidence = float(np.max(age_preds[0])) |
| |
| blob_beauty = cv2.dnn.blobFromImage( |
| face, 1.0 / 255, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False |
| ) |
| self.beauty_net.setInput(blob_beauty) |
| beauty_preds = self.beauty_net.forward() |
| beauty_score = round(float(2.0 * np.sum(beauty_preds[0])), 1) |
| beauty_score = min(10.0, max(0.0, beauty_score)) |
| beauty_score = self._adjust_beauty_score(beauty_score) |
| raw_score = float(np.sum(beauty_preds[0])) |
|
|
| return { |
| "age": age, |
| "age_confidence": round(age_confidence, 4), |
| "gender": gender, |
| "gender_confidence": gender_confidence, |
| "beauty_score": beauty_score, |
| "beauty_raw_score": round(raw_score, 4), |
| "age_model_used": "HowCuteAmI", |
| "gender_model_used": "HowCuteAmI", |
| "beauty_model_used": "HowCuteAmI", |
| } |
| except Exception as e: |
| logger.error(f"HowCuteAmI beauty gender prediction failed: {e}") |
| raise e |
|
|
| def _predict_age_emotion_with_deepface( |
| self, face_image: np.ndarray, include_emotion: bool = True, |
| initial_age_confidence: Optional[float] = None |
| ) -> Dict[str, Any]: |
| """使用DeepFace预测年龄、情绪(并返回可用的性别信息用于回退)""" |
| if not DEEPFACE_AVAILABLE: |
| |
| return self._predict_age_with_howcuteami_fallback(face_image) |
|
|
| if face_image is None or face_image.size == 0: |
| raise ValueError("无效的人脸图像") |
|
|
| try: |
| actions = ["age", "gender", "emotion"] if include_emotion else ["age", "gender"] |
| |
| result = DeepFace.analyze( |
| img_path=face_image, |
| actions=actions, |
| enforce_detection=False, |
| detector_backend="skip", |
| silent=True |
| ) |
|
|
| |
| if isinstance(result, list): |
| result = result[0] |
|
|
| |
| age = result.get("age", 25) |
| if include_emotion: |
| emotion = result.get("dominant_emotion", "neutral") |
| emotion_scores = result.get("emotion", {}) or {} |
| else: |
| emotion = "neutral" |
| emotion_scores = {"neutral": 100.0} |
| |
| deep_gender = result.get("dominant_gender", "Woman") |
| deep_gender_conf = result.get("gender", {}).get(deep_gender, 50.0) / 100.0 |
| deep_gender_conf = self._cap_conf(deep_gender_conf) |
| if str(deep_gender).lower() in ["woman", "female"]: |
| deep_gender = "Female" |
| else: |
| deep_gender = "Male" |
|
|
| |
| age_conf = initial_age_confidence if initial_age_confidence is not None else 0.85 |
| return { |
| "age": str(int(age)), |
| "age_confidence": round(age_conf, 4), |
| "emotion": emotion, |
| "emotion_analysis": emotion_scores, |
| "gender": deep_gender, |
| "gender_confidence": deep_gender_conf, |
| } |
| except Exception as e: |
| logger.error(f"DeepFace age emotion prediction failed, falling back to HowCuteAmI: {e}") |
| return self._predict_age_with_howcuteami_fallback(face_image) |
|
|
| def _predict_age_with_howcuteami_fallback( |
| self, face_image: np.ndarray |
| ) -> Dict[str, Any]: |
| """HowCuteAmI年龄预测回退方案""" |
| try: |
| if face_image is None or face_image.size == 0: |
| raise ValueError("无法读取人脸图像") |
|
|
| face_resized = cv2.resize(face_image, (224, 224)) |
| blob = cv2.dnn.blobFromImage( |
| face_resized, 1.0, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False |
| ) |
|
|
| |
| self.age_net.setInput(blob) |
| age_preds = self.age_net.forward() |
| age = self.age_list[age_preds[0].argmax()] |
| age_confidence = float(np.max(age_preds[0])) |
|
|
| return { |
| "age": age[1:-1], |
| "age_confidence": round(age_confidence, 4), |
| "emotion": "neutral", |
| "emotion_analysis": {"neutral": 100.0}, |
| } |
| except Exception as e: |
| logger.error(f"HowCuteAmI age prediction fallback failed: {e}") |
| return { |
| "age": "25-32", |
| "age_confidence": 0.5, |
| "emotion": "neutral", |
| "emotion_analysis": {"neutral": 100.0}, |
| } |
|
|
| def _predict_with_hybrid_model( |
| self, face: np.ndarray, face_image: np.ndarray |
| ) -> Dict[str, Any]: |
| """混合模型预测:HowCuteAmI(颜值+性别)+ DeepFace(年龄+情绪,年龄置信度低时优先使用)""" |
| |
| beauty_gender_result = self._predict_beauty_gender_with_howcuteami(face) |
|
|
| |
| deepface_emotion_enabled = bool(getattr(config, "DEEPFACE_EMOTION_ENABLED", True)) |
| age_emotion_result: Optional[Dict[str, Any]] = None |
|
|
| |
| howcuteami_age_confidence = beauty_gender_result.get("age_confidence", 0) |
| gender_confidence = beauty_gender_result.get("gender_confidence", 0) |
| if gender_confidence >= 1: |
| gender_confidence = 0.9999 |
| age = beauty_gender_result["age"] |
|
|
| |
| agec = config.AGE_CONFIDENCE |
| if howcuteami_age_confidence < agec: |
| |
| age_emotion_result = self._predict_age_emotion_with_deepface( |
| face_image, include_emotion=deepface_emotion_enabled, |
| initial_age_confidence=howcuteami_age_confidence |
| ) |
| deep_age = age_emotion_result["age"] |
| logger.info( |
| f"HowCuteAmI age confidence ({howcuteami_age_confidence}) below {agec}, value=({age}); using DeepFace for age prediction, value={deep_age}" |
| ) |
| |
| result = { |
| "gender": beauty_gender_result["gender"], |
| "gender_confidence": self._cap_conf(gender_confidence), |
| "beauty_score": beauty_gender_result["beauty_score"], |
| "beauty_raw_score": beauty_gender_result["beauty_raw_score"], |
| "age": deep_age, |
| "age_confidence": age_emotion_result["age_confidence"], |
| "emotion": age_emotion_result.get("emotion") or "neutral", |
| "emotion_analysis": age_emotion_result.get("emotion_analysis") or {"neutral": 100.0}, |
| "model_used": "hybrid_deepface_age", |
| "age_model_used": "DeepFace", |
| "gender_model_used": "HowCuteAmI", |
| } |
| else: |
| |
| logger.info( |
| f"HowCuteAmI age confidence ({howcuteami_age_confidence}) is high enough, value={age}; using HowCuteAmI for age prediction" |
| ) |
| |
| if deepface_emotion_enabled: |
| age_emotion_result = self._predict_age_emotion_with_deepface( |
| face_image, include_emotion=True, |
| initial_age_confidence=howcuteami_age_confidence |
| ) |
| emotion = age_emotion_result.get("emotion") or "neutral" |
| emotion_analysis = age_emotion_result.get("emotion_analysis") or {"neutral": 100.0} |
| else: |
| emotion = "neutral" |
| emotion_analysis = {"neutral": 100.0} |
| |
| result = { |
| "gender": beauty_gender_result["gender"], |
| "gender_confidence": self._cap_conf(gender_confidence), |
| "beauty_score": beauty_gender_result["beauty_score"], |
| "beauty_raw_score": beauty_gender_result["beauty_raw_score"], |
| "age": beauty_gender_result["age"], |
| "age_confidence": beauty_gender_result["age_confidence"], |
| "emotion": emotion, |
| "emotion_analysis": emotion_analysis, |
| "model_used": "hybrid", |
| "age_model_used": "HowCuteAmI", |
| "gender_model_used": "HowCuteAmI", |
| } |
|
|
| |
| try: |
| how_gender = beauty_gender_result.get("gender") |
| how_conf = float(beauty_gender_result.get("gender_confidence", 0) or 0) |
| deep_gender = age_emotion_result.get("gender") if age_emotion_result else None |
| deep_conf = float(age_emotion_result.get("gender_confidence", 0) or 0) if age_emotion_result else 0.0 |
|
|
| final_gender = result.get("gender") |
| final_conf = float(result.get("gender_confidence", 0) or 0) |
| |
| if (str(how_gender) == "Female") or (str(deep_gender) == "Female"): |
| final_gender = "Female" |
| final_conf = max(how_conf if how_gender == "Female" else 0, |
| deep_conf if deep_gender == "Female" else 0) |
| result["gender_model_used"] = "Combined(H+DF)" |
| elif (str(how_gender) == "Male") and (str(deep_gender) == "Male"): |
| final_gender = "Male" |
| final_conf = max(how_conf if how_gender == "Male" else 0, |
| deep_conf if deep_gender == "Male" else 0) |
| result["gender_model_used"] = "Combined(H+DF)" |
| |
|
|
| result["gender"] = final_gender |
| result["gender_confidence"] = self._cap_conf(final_conf) |
| except Exception: |
| pass |
|
|
| return result |
|
|
| def _predict_with_howcuteami(self, face: np.ndarray) -> Dict[str, Any]: |
| """使用HowCuteAmI模型进行完整预测""" |
| try: |
| |
| blob = cv2.dnn.blobFromImage( |
| face, 1.0, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False |
| ) |
| self.gender_net.setInput(blob) |
| gender_preds = self.gender_net.forward() |
| gender = self.gender_list[gender_preds[0].argmax()] |
| gender_confidence = float(np.max(gender_preds[0])) |
| gender_confidence = self._cap_conf(gender_confidence) |
|
|
| |
| self.age_net.setInput(blob) |
| age_preds = self.age_net.forward() |
| age = self.age_list[age_preds[0].argmax()] |
| age_confidence = float(np.max(age_preds[0])) |
|
|
| |
| blob_beauty = cv2.dnn.blobFromImage( |
| face, 1.0 / 255, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False |
| ) |
| self.beauty_net.setInput(blob_beauty) |
| beauty_preds = self.beauty_net.forward() |
| beauty_score = round(float(2.0 * np.sum(beauty_preds[0])), 1) |
| beauty_score = min(10.0, max(0.0, beauty_score)) |
| beauty_score = self._adjust_beauty_score(beauty_score) |
| raw_score = float(np.sum(beauty_preds[0])) |
|
|
| return { |
| "gender": gender, |
| "gender_confidence": gender_confidence, |
| "age": age[1:-1], |
| "age_confidence": round(age_confidence, 4), |
| "beauty_score": beauty_score, |
| "beauty_raw_score": round(raw_score, 4), |
| "model_used": "HowCuteAmI", |
| "emotion": "neutral", |
| "emotion_analysis": {"neutral": 100.0}, |
| "age_model_used": "HowCuteAmI", |
| "gender_model_used": "HowCuteAmI", |
| "beauty_model_used": "HowCuteAmI", |
| } |
| except Exception as e: |
| logger.error(f"HowCuteAmI prediction failed: {e}") |
| raise e |
|
|
| def _predict_with_deepface(self, face_image: np.ndarray) -> Dict[str, Any]: |
| """使用DeepFace进行预测""" |
| if not DEEPFACE_AVAILABLE: |
| raise ValueError("DeepFace未安装") |
|
|
| if face_image is None or face_image.size == 0: |
| raise ValueError("无效的人脸图像") |
|
|
| try: |
| deepface_emotion_enabled = bool(getattr(config, "DEEPFACE_EMOTION_ENABLED", True)) |
| actions = ["age", "gender", "emotion"] if deepface_emotion_enabled else ["age", "gender"] |
| |
| result = DeepFace.analyze( |
| img_path=face_image, |
| actions=actions, |
| enforce_detection=False, |
| detector_backend="skip", |
| silent=True |
| ) |
|
|
| |
| if isinstance(result, list): |
| result = result[0] |
|
|
| |
| age = result.get("age", 25) |
| gender = result.get("dominant_gender", "Woman") |
| gender_confidence = result.get("gender", {}).get(gender, 0.5) / 100 |
| gender_confidence = self._cap_conf(gender_confidence) |
|
|
| |
| if gender.lower() in ["woman", "female"]: |
| gender = "Female" |
| else: |
| gender = "Male" |
|
|
| |
| if deepface_emotion_enabled: |
| emotion = result.get("dominant_emotion", "neutral") |
| emotion_scores = result.get("emotion", {}) or {} |
| else: |
| emotion = "neutral" |
| emotion_scores = {"neutral": 100.0} |
|
|
| |
| happiness_score = emotion_scores.get("happy", 0) / 100 |
| neutral_score = emotion_scores.get("neutral", 0) / 100 |
|
|
| |
| base_beauty = 6.0 |
| emotion_bonus = happiness_score * 2 + neutral_score * 1 |
| age_factor = max(0.5, 1 - abs(age - 25) / 50) |
|
|
| beauty_score = round(min(10.0, base_beauty + emotion_bonus + age_factor), 2) |
|
|
| |
| age_conf = 0.85 |
| return { |
| "gender": gender, |
| "gender_confidence": gender_confidence, |
| "age": str(int(age)), |
| "age_confidence": age_conf, |
| "beauty_score": beauty_score, |
| "beauty_raw_score": round(beauty_score / 10, 4), |
| "model_used": "DeepFace", |
| "emotion": emotion, |
| "emotion_analysis": emotion_scores, |
| "age_model_used": "DeepFace", |
| "gender_model_used": "DeepFace", |
| "beauty_model_used": "Heuristic", |
| } |
| except Exception as e: |
| logger.error(f"DeepFace prediction failed: {e}") |
| raise e |
|
|
| def analyze_faces( |
| self, |
| image: np.ndarray, |
| original_image_hash: str, |
| model_type: ModelType = ModelType.HYBRID, |
| ) -> Dict[str, Any]: |
| """ |
| 分析图片中的人脸 |
| :param image: 输入图像 |
| :param original_image_hash: 原始图片的MD5哈希值 |
| :param model_type: 使用的模型类型 |
| :return: 分析结果 |
| """ |
| if image is None: |
| raise ValueError("invalid image input") |
|
|
| |
| face_boxes = self._detect_faces(image) |
|
|
| if not face_boxes: |
| return { |
| "success": False, |
| "message": "Please upload a clear front-facing photo without any obstructions.", |
| "face_count": 0, |
| "faces": [], |
| "annotated_image": None, |
| "model_used": model_type.value, |
| } |
|
|
| results = { |
| "success": True, |
| "message": f"success detected {len(face_boxes)} faces", |
| "face_count": len(face_boxes), |
| "faces": [], |
| "model_used": model_type.value, |
| } |
|
|
| |
| annotated_image = image.copy() |
| logger.info( |
| f"Input annotated_image shape: {annotated_image.shape}, dtype: {annotated_image.dtype}, ndim: {annotated_image.ndim}" |
| ) |
| |
| for i, face_box in enumerate(face_boxes): |
| |
| face_cropped = self._crop_face(image, face_box) |
| if face_cropped.size == 0: |
| logger.warning(f"Cropped face {i + 1} is empty, skipping.") |
| continue |
|
|
| face_resized = cv2.resize(face_cropped, (224, 224)) |
| face_for_deepface = face_cropped.copy() |
|
|
| |
| try: |
| if model_type == ModelType.HYBRID: |
| |
| prediction_result = self._predict_with_hybrid_model( |
| face_resized, face_for_deepface |
| ) |
| elif model_type == ModelType.HOWCUTEAMI: |
| prediction_result = self._predict_with_howcuteami(face_resized) |
| |
| try: |
| age_emotion_result = self._predict_age_emotion_with_deepface( |
| face_for_deepface, include_emotion=False |
| ) |
| how_gender = prediction_result.get("gender") |
| how_conf = float(prediction_result.get("gender_confidence", 0) or 0) |
| deep_gender = age_emotion_result.get("gender") |
| deep_conf = float(age_emotion_result.get("gender_confidence", 0) or 0) |
| final_gender = prediction_result.get("gender") |
| final_conf = float(prediction_result.get("gender_confidence", 0) or 0) |
| if (str(how_gender) == "Female") or (str(deep_gender) == "Female"): |
| final_gender = "Female" |
| final_conf = max( |
| how_conf if how_gender == "Female" else 0, |
| deep_conf if deep_gender == "Female" else 0, |
| ) |
| prediction_result["gender_model_used"] = "Combined(H+DF)" |
| elif (str(how_gender) == "Male") and (str(deep_gender) == "Male"): |
| final_gender = "Male" |
| final_conf = max( |
| how_conf if how_gender == "Male" else 0, |
| deep_conf if deep_gender == "Male" else 0, |
| ) |
| prediction_result["gender_model_used"] = "Combined(H+DF)" |
| prediction_result["gender"] = final_gender |
| prediction_result["gender_confidence"] = round(float(final_conf), 4) |
| except Exception: |
| pass |
| elif model_type == ModelType.DEEPFACE and DEEPFACE_AVAILABLE: |
| prediction_result = self._predict_with_deepface(face_for_deepface) |
| |
| try: |
| beauty_gender_result = self._predict_beauty_gender_with_howcuteami( |
| face_resized |
| ) |
| deep_gender = prediction_result.get("gender") |
| deep_conf = float(prediction_result.get("gender_confidence", 0) or 0) |
| how_gender = beauty_gender_result.get("gender") |
| how_conf = float(beauty_gender_result.get("gender_confidence", 0) or 0) |
| final_gender = prediction_result.get("gender") |
| final_conf = float(prediction_result.get("gender_confidence", 0) or 0) |
| if (str(how_gender) == "Female") or (str(deep_gender) == "Female"): |
| final_gender = "Female" |
| final_conf = max( |
| how_conf if how_gender == "Female" else 0, |
| deep_conf if deep_gender == "Female" else 0, |
| ) |
| prediction_result["gender_model_used"] = "Combined(H+DF)" |
| elif (str(how_gender) == "Male") and (str(deep_gender) == "Male"): |
| final_gender = "Male" |
| final_conf = max( |
| how_conf if how_gender == "Male" else 0, |
| deep_conf if deep_gender == "Male" else 0, |
| ) |
| prediction_result["gender_model_used"] = "Combined(H+DF)" |
| prediction_result["gender"] = final_gender |
| prediction_result["gender_confidence"] = round(float(final_conf), 4) |
| except Exception: |
| pass |
| else: |
| |
| prediction_result = self._predict_with_hybrid_model( |
| face_resized, face_for_deepface |
| ) |
| logger.warning( |
| f"Model {model_type.value} is not available, using hybrid mode" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Prediction failed, using default values: {e}") |
| prediction_result = { |
| "gender": "Unknown", |
| "gender_confidence": 0.5, |
| "age": "25-32", |
| "age_confidence": 0.5, |
| "beauty_score": 5.0, |
| "beauty_raw_score": 0.5, |
| "emotion": "neutral", |
| "emotion_analysis": {"neutral": 100.0}, |
| "model_used": "fallback", |
| } |
|
|
| |
| |
| |
| |
|
|
| |
| gender = prediction_result.get("gender", "Unknown") |
| color_bgr = self.gender_colors.get(gender, (128, 128, 128)) |
| color_hex = f"#{color_bgr[2]:02x}{color_bgr[1]:02x}{color_bgr[0]:02x}" |
|
|
| |
| raw_age_str = prediction_result.get("age", "Unknown") |
| display_age_str = str(raw_age_str) |
| age_adjusted_flag = False |
| age_adjustment_value = int(getattr(config, "FEMALE_AGE_ADJUSTMENT", 0) or 0) |
| age_adjustment_threshold = int(getattr(config, "FEMALE_AGE_ADJUSTMENT_THRESHOLD", 999) or 999) |
|
|
| |
| try: |
| |
| if "-" in str(raw_age_str): |
| age_num = int(str(raw_age_str).split("-")[0].strip("() ")) |
| else: |
| age_num = int(str(raw_age_str).strip()) |
|
|
| if str(gender) == "Female" and age_num >= age_adjustment_threshold and age_adjustment_value > 0: |
| adjusted_age = max(0, age_num - age_adjustment_value) |
| display_age_str = str(adjusted_age) |
| age_adjusted_flag = True |
| try: |
| logger.info(f"Adjusted age for female (draw+data): {age_num} -> {adjusted_age}") |
| except Exception: |
| pass |
| except Exception: |
| |
| pass |
|
|
| |
| date_sub = get_date_subfolder(OUTPUT_DIR) |
| cropped_face_filename = f"{date_sub}/{original_image_hash}_face_{i + 1}.webp" |
| cropped_face_path = os.path.join(OUTPUT_DIR, cropped_face_filename) |
| try: |
| save_image_high_quality(face_cropped, cropped_face_path) |
| logger.info(f"cropped face: {cropped_face_path}") |
| except Exception as e: |
| logger.error(f"Failed to save cropped face {cropped_face_path}: {e}") |
| cropped_face_filename = None |
|
|
| |
| if config.DRAW_SCORE: |
| cv2.rectangle( |
| annotated_image, |
| (face_box[0], face_box[1]), |
| (face_box[2], face_box[3]), |
| color_bgr, |
| int(round(image.shape[0] / 400)), |
| 8, |
| ) |
|
|
| |
| beauty_score = prediction_result.get("beauty_score", 0) |
| label = f"{gender}, {display_age_str}, {beauty_score}" |
|
|
| font_scale = max( |
| 0.3, min(0.7, image.shape[0] / 800) |
| ) |
| font_thickness = 2 |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| |
| text_x = face_box[0] |
| text_y = face_box[1] - 10 if face_box[1] - 10 > 20 else face_box[1] + 30 |
|
|
| |
| (text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, font_thickness) |
|
|
| |
| background_tl = (text_x, text_y - text_height - baseline) |
| background_br = (text_x + text_width, text_y + baseline) |
|
|
| if config.DRAW_SCORE: |
| cv2.rectangle( |
| annotated_image, |
| background_tl, |
| background_br, |
| color_bgr, |
| thickness=-1 |
| ) |
| cv2.putText( |
| annotated_image, |
| label, |
| (text_x, text_y), |
| font, |
| font_scale, |
| (255, 255, 255), |
| font_thickness, |
| cv2.LINE_AA, |
| ) |
|
|
| |
| face_result = { |
| "face_id": i + 1, |
| "gender": gender, |
| "gender_confidence": prediction_result.get("gender_confidence", 0), |
| "gender_model_used": prediction_result.get("gender_model_used", prediction_result.get("model_used", model_type.value)), |
| "age": display_age_str, |
| "age_confidence": prediction_result.get("age_confidence", 0), |
| "age_model_used": prediction_result.get("age_model_used", prediction_result.get("model_used", model_type.value)), |
| "beauty_score": prediction_result.get("beauty_score", 0), |
| "beauty_raw_score": prediction_result.get("beauty_raw_score", 0), |
| "emotion": prediction_result.get("emotion") or "neutral", |
| "emotion_analysis": prediction_result.get("emotion_analysis") or {"neutral": 100.0}, |
| |
| "bounding_box": { |
| "x1": int(face_box[0]), |
| "y1": int(face_box[1]), |
| "x2": int(face_box[2]), |
| "y2": int(face_box[3]), |
| }, |
| "color": { |
| "bgr": [int(color_bgr[0]), int(color_bgr[1]), int(color_bgr[2])], |
| "hex": color_hex, |
| }, |
| "cropped_face_filename": cropped_face_filename, |
| "model_used": prediction_result.get("model_used", model_type.value), |
| } |
|
|
| if age_adjusted_flag: |
| face_result["age_adjusted"] = True |
| face_result["age_adjustment_value"] = int(age_adjustment_value) |
|
|
| results["faces"].append(face_result) |
|
|
| results["annotated_image"] = annotated_image |
| return results |
|
|
| def _warmup_models(self): |
| """预热模型,减少首次调用延迟""" |
| try: |
| logger.info("Starting to warm up models...") |
|
|
| |
| test_image = np.ones((64, 64, 3), dtype=np.uint8) * 128 |
|
|
| |
| if DEEPFACE_AVAILABLE: |
| try: |
| import tempfile |
| deepface_emotion_enabled = bool(getattr(config, "DEEPFACE_EMOTION_ENABLED", True)) |
| with tempfile.NamedTemporaryFile(suffix='.webp', delete=False) as tmp_file: |
| cv2.imwrite(tmp_file.name, test_image, [cv2.IMWRITE_WEBP_QUALITY, 95]) |
| |
| DeepFace.analyze( |
| img_path=tmp_file.name, |
| actions=["age", "gender", "emotion"] if deepface_emotion_enabled else ["age", "gender"], |
| detector_backend="yolov8", |
| enforce_detection=False, |
| silent=True |
| ) |
| os.unlink(tmp_file.name) |
| logger.info("DeepFace model warm-up completed") |
| except Exception as e: |
| logger.warning(f"DeepFace model warm-up failed: {e}") |
|
|
| |
| try: |
| |
| blob = cv2.dnn.blobFromImage(test_image, 1.0, (300, 300), (104, 117, 123)) |
| self.face_net.setInput(blob) |
| self.face_net.forward() |
|
|
| |
| test_face = cv2.resize(test_image, (224, 224)) |
| blob = cv2.dnn.blobFromImage(test_face, 1.0, (224, 224), self.MODEL_MEAN_VALUES, swapRB=False) |
| self.age_net.setInput(blob) |
| self.age_net.forward() |
|
|
| |
| self.gender_net.setInput(blob) |
| self.gender_net.forward() |
|
|
| |
| self.beauty_net.setInput(blob) |
| self.beauty_net.forward() |
|
|
| logger.info("OpenCV DNN model warm-up completed") |
| except Exception as e: |
| logger.warning(f"OpenCV DNN model warm-up failed: {e}") |
|
|
| logger.info("Model warm-up completed") |
| except Exception as e: |
| logger.warning(f"Error occurred during model warm-up: {e}") |
|
|