# built-in dependencies import os import pickle from typing import List, Union, Optional, Dict, Any import time # 3rd party dependencies import numpy as np import pandas as pd from tqdm import tqdm # project dependencies from deepface.commons import image_utils from deepface.modules import representation, detection, verification from deepface.commons import logger as log logger = log.get_singletonish_logger() def find( img_path: Union[str, np.ndarray], db_path: str, model_name: str = "VGG-Face", distance_metric: str = "cosine", enforce_detection: bool = True, detector_backend: str = "opencv", align: bool = True, expand_percentage: int = 0, threshold: Optional[float] = None, normalization: str = "base", silent: bool = False, ) -> List[pd.DataFrame]: """ Identify individuals in a database Args: img_path (str or np.ndarray): The exact path to the image, a numpy array in BGR format, or a base64 encoded image. If the source image contains multiple faces, the result will include information for each detected face. db_path (string): Path to the folder containing image files. All detected faces in the database will be considered in the decision-making process. model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512, OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet (default is VGG-Face). distance_metric (string): Metric for measuring similarity. Options: 'cosine', 'euclidean', 'euclidean_l2'. enforce_detection (boolean): If no face is detected in an image, raise an exception. Default is True. Set to False to avoid the exception for low-resolution images. detector_backend (string): face detector backend. Options: 'opencv', 'retinaface', 'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8', 'centerface' or 'skip'. align (boolean): Perform alignment based on the eye positions. expand_percentage (int): expand detected facial area with a percentage (default is 0). threshold (float): Specify a threshold to determine whether a pair represents the same person or different individuals. This threshold is used for comparing distances. If left unset, default pre-tuned threshold values will be applied based on the specified model name and distance metric (default is None). normalization (string): Normalize the input image before feeding it to the model. Default is base. Options: base, raw, Facenet, Facenet2018, VGGFace, VGGFace2, ArcFace silent (boolean): Suppress or allow some log messages for a quieter analysis process. Returns: results (List[pd.DataFrame]): A list of pandas dataframes. Each dataframe corresponds to the identity information for an individual detected in the source image. The DataFrame columns include: - 'identity': Identity label of the detected individual. - 'target_x', 'target_y', 'target_w', 'target_h': Bounding box coordinates of the target face in the database. - 'source_x', 'source_y', 'source_w', 'source_h': Bounding box coordinates of the detected face in the source image. - 'threshold': threshold to determine a pair whether same person or different persons - 'distance': Similarity score between the faces based on the specified model and distance metric """ tic = time.time() if os.path.isdir(db_path) is not True: raise ValueError("Passed db_path does not exist!") file_parts = [ "ds", "model", model_name, "detector", detector_backend, "aligned" if align else "unaligned", "normalization", normalization, "expand", str(expand_percentage), ] file_name = "_".join(file_parts) + ".pkl" file_name = file_name.replace("-", "").lower() datastore_path = os.path.join(db_path, file_name) representations = [] # required columns for representations df_cols = [ "identity", "hash", "embedding", "target_x", "target_y", "target_w", "target_h", ] # Ensure the proper pickle file exists if not os.path.exists(datastore_path): with open(datastore_path, "wb") as f: pickle.dump([], f) # Load the representations from the pickle file with open(datastore_path, "rb") as f: representations = pickle.load(f) # check each item of representations list has required keys for i, current_representation in enumerate(representations): missing_keys = list(set(df_cols) - set(current_representation.keys())) if len(missing_keys) > 0: raise ValueError( f"{i}-th item does not have some required keys - {missing_keys}." f"Consider to delete {datastore_path}" ) # embedded images pickled_images = [representation["identity"] for representation in representations] # Get the list of images on storage storage_images = image_utils.list_images(path=db_path) if len(storage_images) == 0: raise ValueError(f"No item found in {db_path}") # Enforce data consistency amongst on disk images and pickle file must_save_pickle = False new_images = list(set(storage_images) - set(pickled_images)) # images added to storage old_images = list(set(pickled_images) - set(storage_images)) # images removed from storage # detect replaced images replaced_images = [] for current_representation in representations: identity = current_representation["identity"] if identity in old_images: continue alpha_hash = current_representation["hash"] beta_hash = image_utils.find_image_hash(identity) if alpha_hash != beta_hash: logger.debug(f"Even though {identity} represented before, it's replaced later.") replaced_images.append(identity) if not silent and (len(new_images) > 0 or len(old_images) > 0 or len(replaced_images) > 0): logger.info( f"Found {len(new_images)} newly added image(s)" f", {len(old_images)} removed image(s)" f", {len(replaced_images)} replaced image(s)." ) # append replaced images into both old and new images. these will be dropped and re-added. new_images = new_images + replaced_images old_images = old_images + replaced_images # remove old images first if len(old_images) > 0: representations = [rep for rep in representations if rep["identity"] not in old_images] must_save_pickle = True # find representations for new images if len(new_images) > 0: representations += __find_bulk_embeddings( employees=new_images, model_name=model_name, detector_backend=detector_backend, enforce_detection=enforce_detection, align=align, expand_percentage=expand_percentage, normalization=normalization, silent=silent, ) # add new images must_save_pickle = True if must_save_pickle: with open(datastore_path, "wb") as f: pickle.dump(representations, f) if not silent: logger.info(f"There are now {len(representations)} representations in {file_name}") # Should we have no representations bailout if len(representations) == 0: if not silent: toc = time.time() logger.info(f"find function duration {toc - tic} seconds") return [] # ---------------------------- # now, we got representations for facial database df = pd.DataFrame(representations) if silent is False: logger.info(f"Searching {img_path} in {df.shape[0]} length datastore") # img path might have more than once face source_objs = detection.extract_faces( img_path=img_path, detector_backend=detector_backend, grayscale=False, enforce_detection=enforce_detection, align=align, expand_percentage=expand_percentage, ) resp_obj = [] for source_obj in source_objs: source_img = source_obj["face"] source_region = source_obj["facial_area"] target_embedding_obj = representation.represent( img_path=source_img, model_name=model_name, enforce_detection=enforce_detection, detector_backend="skip", align=align, normalization=normalization, ) target_representation = target_embedding_obj[0]["embedding"] result_df = df.copy() # df will be filtered in each img result_df["source_x"] = source_region["x"] result_df["source_y"] = source_region["y"] result_df["source_w"] = source_region["w"] result_df["source_h"] = source_region["h"] distances = [] for _, instance in df.iterrows(): source_representation = instance["embedding"] if source_representation is None: distances.append(float("inf")) # no representation for this image continue target_dims = len(list(target_representation)) source_dims = len(list(source_representation)) if target_dims != source_dims: raise ValueError( "Source and target embeddings must have same dimensions but " + f"{target_dims}:{source_dims}. Model structure may change" + " after pickle created. Delete the {file_name} and re-run." ) distance = verification.find_distance( source_representation, target_representation, distance_metric ) distances.append(distance) # --------------------------- target_threshold = threshold or verification.find_threshold(model_name, distance_metric) result_df["threshold"] = target_threshold result_df["distance"] = distances result_df = result_df.drop(columns=["embedding"]) # pylint: disable=unsubscriptable-object result_df = result_df[result_df["distance"] <= target_threshold] result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True) resp_obj.append(result_df) # ----------------------------------- if not silent: toc = time.time() logger.info(f"find function duration {toc - tic} seconds") return resp_obj def __find_bulk_embeddings( employees: List[str], model_name: str = "VGG-Face", detector_backend: str = "opencv", enforce_detection: bool = True, align: bool = True, expand_percentage: int = 0, normalization: str = "base", silent: bool = False, ) -> List[Dict["str", Any]]: """ Find embeddings of a list of images Args: employees (list): list of exact image paths model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512, OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet (default is VGG-Face). detector_backend (str): face detector model name enforce_detection (bool): set this to False if you want to proceed when you cannot detect any face align (bool): enable or disable alignment of image before feeding to facial recognition model expand_percentage (int): expand detected facial area with a percentage (default is 0). normalization (bool): normalization technique silent (bool): enable or disable informative logging Returns: representations (list): pivot list of dict with image name, hash, embedding and detected face area's coordinates """ representations = [] for employee in tqdm( employees, desc="Finding representations", disable=silent, ): file_hash = image_utils.find_image_hash(employee) try: img_objs = detection.extract_faces( img_path=employee, detector_backend=detector_backend, grayscale=False, enforce_detection=enforce_detection, align=align, expand_percentage=expand_percentage, ) except ValueError as err: logger.error(f"Exception while extracting faces from {employee}: {str(err)}") img_objs = [] if len(img_objs) == 0: representations.append( { "identity": employee, "hash": file_hash, "embedding": None, "target_x": 0, "target_y": 0, "target_w": 0, "target_h": 0, } ) else: for img_obj in img_objs: img_content = img_obj["face"] img_region = img_obj["facial_area"] embedding_obj = representation.represent( img_path=img_content, model_name=model_name, enforce_detection=enforce_detection, detector_backend="skip", align=align, normalization=normalization, ) img_representation = embedding_obj[0]["embedding"] representations.append( { "identity": employee, "hash": file_hash, "embedding": img_representation, "target_x": img_region["x"], "target_y": img_region["y"], "target_w": img_region["w"], "target_h": img_region["h"], } ) return representations