import json import zipfile import numpy as np import cv2 import os import gradio as gr from deepface import DeepFace from ultralytics import YOLO import urllib.request import asyncio with open('config.json', 'r') as f: config = json.load(f) FACE_DIST_TRESH = config['FACE_DIST_TRESH'] FACE_DET_TRESH = config['FACE_DET_TRESH'] YOLO_WEIGHTS_URL = config['YOLO_WEIGHTS_URL'] yolo_weights_filename = os.path.basename(YOLO_WEIGHTS_URL) if not os.path.exists(yolo_weights_filename): urllib.request.urlretrieve(YOLO_WEIGHTS_URL, yolo_weights_filename) model = YOLO(yolo_weights_filename) async def find_distance(base_face, check_face): result = await asyncio.to_thread(DeepFace.verify, base_face, check_face, enforce_detection=False) return result['distance'] def find_faces(image): outputs = model(image) faces = [] for box in outputs[0].boxes: if float(box.conf) >= FACE_DET_TRESH: x, y, w, h = [int(coord) for coord in box.xywh[0]] x_center, y_center = x + w / 2, y + h / 2 x1 = int(x_center - w) y1 = int(y_center - h) crop_img = image[y1:y1+h, x1:x1+w] faces.append(crop_img) return faces async def load_images_from_zip(zip_path): images = [] loop = asyncio.get_running_loop() with zipfile.ZipFile(zip_path, 'r') as zip_file: for file_name in zip_file.namelist(): with zip_file.open(file_name) as file: img_bytes = await loop.run_in_executor(None, file.read) img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_COLOR) if img is not None: images.append(img) return images def create_image(images): table_width = 800 row_height = 100 margin = 10 text_margin = 20 id_col_width = 100 font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.5 color = (255, 255, 255) thickness = 2 table_height = text_margin + margin + (row_height + margin) * len(images) table = np.zeros((table_height, table_width, 3), np.uint8) id_x = 10 img_x = id_col_width + 10 y = text_margin cv2.putText(table, 'Image ID', (id_x, y), font, font_scale, color, thickness) cv2.putText(table, 'Face', (img_x, y), font, font_scale, color, thickness) y += margin for i, img in enumerate(images): height, width = img.shape[:2] new_width = int(width * row_height / height) if img_x + new_width > table_width: new_width = table_width - img_x img_resized = cv2.resize(img, (new_width, row_height)) cv2.putText(table, str(i), (id_x, y + margin), font, font_scale, color, thickness) table[y:y+row_height, img_x:img_x+new_width] = img_resized y += row_height + margin for col in range(table.shape[1]-1, -1, -1): if not np.any(table[:, col]): continue else: break table_cropped = table[:, :col+1+id_x] return table_cropped async def process_photo_async(photo, input_avatars_faces): not_found_faces = [] avatars_faces_count = len(input_avatars_faces) input_faces = find_faces(photo) for input_face in input_faces: for i in range(avatars_faces_count): distance = await find_distance(input_avatars_faces[i], input_face) if distance <= FACE_DIST_TRESH: break elif i + 1 == avatars_faces_count: not_found_faces.append(input_face) return not_found_faces async def check_async(photos, input_avatars_faces, progress): tasks = [] not_found_faces = [] for photo in photos: task = asyncio.create_task(process_photo_async(photo, input_avatars_faces)) tasks.append(task) for i, task in enumerate(tasks): result = await task not_found_faces += result progress((i+1)/len(tasks)) return not_found_faces def check(avatars_zip, photos_zip, progress=gr.Progress()): avatars = asyncio.run(load_images_from_zip(avatars_zip.name)) avatars = [cv2.cvtColor(avatar, cv2.COLOR_RGB2BGR) for avatar in avatars] photos = asyncio.run(load_images_from_zip(photos_zip.name)) photos = [cv2.cvtColor(photo, cv2.COLOR_RGB2BGR) for photo in photos] input_avatars_faces = [find_faces(avatar) for avatar in avatars] input_avatars_faces = [face for faces in input_avatars_faces for face in faces] not_found_faces = asyncio.run(check_async(photos, input_avatars_faces, progress)) return create_image(not_found_faces) title = '