Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| import json | |
| import zipfile | |
| import numpy as np | |
| import cv2 | |
| import os | |
| import gradio as gr | |
| from deepface import DeepFace | |
| from ultralytics import YOLO | |
| import urllib.request | |
| import asyncio | |
| with open('config.json', 'r') as f: | |
| config = json.load(f) | |
| FACE_DIST_TRESH = config['FACE_DIST_TRESH'] | |
| FACE_DET_TRESH = config['FACE_DET_TRESH'] | |
| YOLO_WEIGHTS_URL = config['YOLO_WEIGHTS_URL'] | |
| yolo_weights_filename = os.path.basename(YOLO_WEIGHTS_URL) | |
| if not os.path.exists(yolo_weights_filename): | |
| urllib.request.urlretrieve(YOLO_WEIGHTS_URL, yolo_weights_filename) | |
| model = YOLO(yolo_weights_filename) | |
| async def find_distance(base_face, check_face): | |
| result = await asyncio.to_thread(DeepFace.verify, base_face, check_face, enforce_detection=False) | |
| return result['distance'] | |
| def find_faces(image): | |
| outputs = model(image) | |
| faces = [] | |
| for box in outputs[0].boxes: | |
| if float(box.conf) >= FACE_DET_TRESH: | |
| x, y, w, h = [int(coord) for coord in box.xywh[0]] | |
| x_center, y_center = x + w / 2, y + h / 2 | |
| x1 = int(x_center - w) | |
| y1 = int(y_center - h) | |
| crop_img = image[y1:y1+h, x1:x1+w] | |
| faces.append(crop_img) | |
| return faces | |
| async def load_images_from_zip(zip_path): | |
| images = [] | |
| loop = asyncio.get_running_loop() | |
| with zipfile.ZipFile(zip_path, 'r') as zip_file: | |
| for file_name in zip_file.namelist(): | |
| with zip_file.open(file_name) as file: | |
| img_bytes = await loop.run_in_executor(None, file.read) | |
| img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if img is not None: | |
| images.append(img) | |
| return images | |
| def create_image(images): | |
| table_width = 800 | |
| row_height = 100 | |
| margin = 10 | |
| text_margin = 20 | |
| id_col_width = 100 | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale = 0.5 | |
| color = (255, 255, 255) | |
| thickness = 2 | |
| table_height = text_margin + margin + (row_height + margin) * len(images) | |
| table = np.zeros((table_height, table_width, 3), np.uint8) | |
| id_x = 10 | |
| img_x = id_col_width + 10 | |
| y = text_margin | |
| cv2.putText(table, 'Image ID', (id_x, y), font, font_scale, color, thickness) | |
| cv2.putText(table, 'Face', (img_x, y), font, font_scale, color, thickness) | |
| y += margin | |
| for i, img in enumerate(images): | |
| height, width = img.shape[:2] | |
| new_width = int(width * row_height / height) | |
| if img_x + new_width > table_width: | |
| new_width = table_width - img_x | |
| img_resized = cv2.resize(img, (new_width, row_height)) | |
| cv2.putText(table, str(i), (id_x, y + margin), font, font_scale, color, thickness) | |
| table[y:y+row_height, img_x:img_x+new_width] = img_resized | |
| y += row_height + margin | |
| for col in range(table.shape[1]-1, -1, -1): | |
| if not np.any(table[:, col]): | |
| continue | |
| else: | |
| break | |
| table_cropped = table[:, :col+1+id_x] | |
| return table_cropped | |
| async def process_photo_async(photo, input_avatars_faces): | |
| not_found_faces = [] | |
| avatars_faces_count = len(input_avatars_faces) | |
| input_faces = find_faces(photo) | |
| for input_face in input_faces: | |
| for i in range(avatars_faces_count): | |
| distance = await find_distance(input_avatars_faces[i], input_face) | |
| if distance <= FACE_DIST_TRESH: | |
| break | |
| elif i + 1 == avatars_faces_count: | |
| not_found_faces.append(input_face) | |
| return not_found_faces | |
| async def check_async(photos, input_avatars_faces, progress): | |
| tasks = [] | |
| not_found_faces = [] | |
| for photo in photos: | |
| task = asyncio.create_task(process_photo_async(photo, input_avatars_faces)) | |
| tasks.append(task) | |
| for i, task in enumerate(tasks): | |
| result = await task | |
| not_found_faces += result | |
| progress((i+1)/len(tasks)) | |
| return not_found_faces | |
| def check(avatars_zip, photos_zip, progress=gr.Progress()): | |
| avatars = asyncio.run(load_images_from_zip(avatars_zip.name)) | |
| avatars = [cv2.cvtColor(avatar, cv2.COLOR_RGB2BGR) for avatar in avatars] | |
| photos = asyncio.run(load_images_from_zip(photos_zip.name)) | |
| photos = [cv2.cvtColor(photo, cv2.COLOR_RGB2BGR) for photo in photos] | |
| input_avatars_faces = [find_faces(avatar) for avatar in avatars] | |
| input_avatars_faces = [face for faces in input_avatars_faces for face in faces] | |
| not_found_faces = asyncio.run(check_async(photos, input_avatars_faces, progress)) | |
| return create_image(not_found_faces) | |
| title = '<h1 style="text-align:center">SquadDetective</h1>' | |
| logo = '<center><img src="https://i.ibb.co/C0BH40g/logo.png" width="300" height="300" alt="SquadDetective logo"></center>' | |
| with gr.Blocks(theme='soft', title='SquadDetective') as blocks: | |
| gr.HTML(title) | |
| gr.HTML(logo) | |
| gr.Markdown('**SquadDetective** is a service that helps sports teams to identify unclaimed players by comparing their faces to photos taken during matches. By using state-of-the-art facial recognition technology, this service can quickly and accurately match the faces of players in photos to a database of registered players, allowing teams to quickly identify any unclaimed players and take appropriate action. With **SquadDetective**, sports teams can ensure that all players are properly registered and eligible to play, helping to avoid potential penalties and other issues.') | |
| with gr.Row(): | |
| avatars = gr.inputs.File(label='Avatar photos (zip)') | |
| photos = gr.inputs.File(label='Photos to be processed (zip)') | |
| inputs = [avatars, photos] | |
| process_button = gr.Button('Process') | |
| outputs=gr.outputs.Image(type='numpy', label='Report') | |
| process_button.click(fn=check, inputs=inputs, outputs=outputs) | |
| blocks.queue(concurrency_count=1).launch() |