Spaces:
Runtime error
Runtime error
import json | |
import zipfile | |
import numpy as np | |
import cv2 | |
import os | |
import gradio as gr | |
from deepface import DeepFace | |
from ultralytics import YOLO | |
import urllib.request | |
import asyncio | |
with open('config.json', 'r') as f: | |
config = json.load(f) | |
FACE_DIST_TRESH = config['FACE_DIST_TRESH'] | |
FACE_DET_TRESH = config['FACE_DET_TRESH'] | |
YOLO_WEIGHTS_URL = config['YOLO_WEIGHTS_URL'] | |
yolo_weights_filename = os.path.basename(YOLO_WEIGHTS_URL) | |
if not os.path.exists(yolo_weights_filename): | |
urllib.request.urlretrieve(YOLO_WEIGHTS_URL, yolo_weights_filename) | |
model = YOLO(yolo_weights_filename) | |
async def find_distance(base_face, check_face): | |
result = await asyncio.to_thread(DeepFace.verify, base_face, check_face, enforce_detection=False) | |
return result['distance'] | |
def find_faces(image): | |
outputs = model(image) | |
faces = [] | |
for box in outputs[0].boxes: | |
if float(box.conf) >= FACE_DET_TRESH: | |
x, y, w, h = [int(coord) for coord in box.xywh[0]] | |
x_center, y_center = x + w / 2, y + h / 2 | |
x1 = int(x_center - w) | |
y1 = int(y_center - h) | |
crop_img = image[y1:y1+h, x1:x1+w] | |
faces.append(crop_img) | |
return faces | |
async def load_images_from_zip(zip_path): | |
images = [] | |
loop = asyncio.get_running_loop() | |
with zipfile.ZipFile(zip_path, 'r') as zip_file: | |
for file_name in zip_file.namelist(): | |
with zip_file.open(file_name) as file: | |
img_bytes = await loop.run_in_executor(None, file.read) | |
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_COLOR) | |
if img is not None: | |
images.append(img) | |
return images | |
def create_image(images): | |
table_width = 800 | |
row_height = 100 | |
margin = 10 | |
text_margin = 20 | |
id_col_width = 100 | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
font_scale = 0.5 | |
color = (255, 255, 255) | |
thickness = 2 | |
table_height = text_margin + margin + (row_height + margin) * len(images) | |
table = np.zeros((table_height, table_width, 3), np.uint8) | |
id_x = 10 | |
img_x = id_col_width + 10 | |
y = text_margin | |
cv2.putText(table, 'Image ID', (id_x, y), font, font_scale, color, thickness) | |
cv2.putText(table, 'Face', (img_x, y), font, font_scale, color, thickness) | |
y += margin | |
for i, img in enumerate(images): | |
height, width = img.shape[:2] | |
new_width = int(width * row_height / height) | |
if img_x + new_width > table_width: | |
new_width = table_width - img_x | |
img_resized = cv2.resize(img, (new_width, row_height)) | |
cv2.putText(table, str(i), (id_x, y + margin), font, font_scale, color, thickness) | |
table[y:y+row_height, img_x:img_x+new_width] = img_resized | |
y += row_height + margin | |
for col in range(table.shape[1]-1, -1, -1): | |
if not np.any(table[:, col]): | |
continue | |
else: | |
break | |
table_cropped = table[:, :col+1+id_x] | |
return table_cropped | |
async def process_photo_async(photo, input_avatars_faces): | |
not_found_faces = [] | |
avatars_faces_count = len(input_avatars_faces) | |
input_faces = find_faces(photo) | |
for input_face in input_faces: | |
for i in range(avatars_faces_count): | |
distance = await find_distance(input_avatars_faces[i], input_face) | |
if distance <= FACE_DIST_TRESH: | |
break | |
elif i + 1 == avatars_faces_count: | |
not_found_faces.append(input_face) | |
return not_found_faces | |
async def check_async(photos, input_avatars_faces, progress): | |
tasks = [] | |
not_found_faces = [] | |
for photo in photos: | |
task = asyncio.create_task(process_photo_async(photo, input_avatars_faces)) | |
tasks.append(task) | |
for i, task in enumerate(tasks): | |
result = await task | |
not_found_faces += result | |
progress((i+1)/len(tasks)) | |
return not_found_faces | |
def check(avatars_zip, photos_zip, progress=gr.Progress()): | |
avatars = asyncio.run(load_images_from_zip(avatars_zip.name)) | |
avatars = [cv2.cvtColor(avatar, cv2.COLOR_RGB2BGR) for avatar in avatars] | |
photos = asyncio.run(load_images_from_zip(photos_zip.name)) | |
photos = [cv2.cvtColor(photo, cv2.COLOR_RGB2BGR) for photo in photos] | |
input_avatars_faces = [find_faces(avatar) for avatar in avatars] | |
input_avatars_faces = [face for faces in input_avatars_faces for face in faces] | |
not_found_faces = asyncio.run(check_async(photos, input_avatars_faces, progress)) | |
return create_image(not_found_faces) | |
title = '<h1 style="text-align:center">SquadDetective</h1>' | |
logo = '<center><img src="https://i.ibb.co/C0BH40g/logo.png" width="300" height="300" alt="SquadDetective logo"></center>' | |
with gr.Blocks(theme='soft', title='SquadDetective') as blocks: | |
gr.HTML(title) | |
gr.HTML(logo) | |
gr.Markdown('**SquadDetective** is a service that helps sports teams to identify unclaimed players by comparing their faces to photos taken during matches. By using state-of-the-art facial recognition technology, this service can quickly and accurately match the faces of players in photos to a database of registered players, allowing teams to quickly identify any unclaimed players and take appropriate action. With **SquadDetective**, sports teams can ensure that all players are properly registered and eligible to play, helping to avoid potential penalties and other issues.') | |
with gr.Row(): | |
avatars = gr.inputs.File(label='Avatar photos (zip)') | |
photos = gr.inputs.File(label='Photos to be processed (zip)') | |
inputs = [avatars, photos] | |
process_button = gr.Button('Process') | |
outputs=gr.outputs.Image(type='numpy', label='Report') | |
process_button.click(fn=check, inputs=inputs, outputs=outputs) | |
blocks.queue(concurrency_count=1).launch() |