|
import os |
|
import json |
|
import math |
|
import pyzipper |
|
import numpy as np |
|
import gradio as gr |
|
import numpy as np |
|
import opennsfw2 as n2 |
|
import base64 |
|
|
|
from annoy import AnnoyIndex |
|
from deepface.commons import functions |
|
from deepface.basemodels import Facenet512 |
|
from fastcore.all import * |
|
from fastai.vision.all import * |
|
|
|
|
|
os.environ["DEEPFACE_HOME"] = "." |
|
|
|
yahooNsfwModel = n2.make_open_nsfw_model() |
|
|
|
model = Facenet512.loadModel() |
|
|
|
input_shape_x, input_shape_y = functions.find_input_shape(model) |
|
|
|
index = AnnoyIndex(512, "euclidean") |
|
index.load(f"face.db") |
|
|
|
ANNOY_INDEX = json.load(open(f"face.json")) |
|
|
|
with pyzipper.AESZipFile("persons.zip") as zf: |
|
password = os.getenv("VISAGE_KEY", "").encode("ascii") |
|
zf.setpassword(password) |
|
PERFORMER_DB = json.loads(zf.read("performers.json")) |
|
|
|
|
|
def image_search_performer(image, threshold=20.0, results=3): |
|
"""Search for a performer in an image |
|
|
|
Returns a list of performers with at least following keys: |
|
- id: the performer's id |
|
- distance: the distance between the face in the image and the performer's face |
|
- confidence: a confidence score between 0 and 100 |
|
- hits: the number of times the performer was found in our database |
|
""" |
|
|
|
image_array = np.array(image) |
|
|
|
try : |
|
img = functions.preprocess_face( |
|
img=image_array, |
|
target_size=(input_shape_x, input_shape_y), |
|
detector_backend="retinaface", |
|
align=True, |
|
) |
|
img = functions.normalize_input(img, normalization="Facenet2018") |
|
face = model.predict(img)[0] |
|
return search_performer(face, threshold, results) |
|
|
|
except Exception as e: |
|
print(e, "for img", image) |
|
|
|
return [] |
|
|
|
|
|
def search_performer(vector, threshold=20.0, results=3): |
|
threshold = threshold or 20.0 |
|
results = results or 3 |
|
|
|
ids, distances = index.get_nns_by_vector( |
|
vector, 50, search_k=10000, include_distances=True |
|
) |
|
persons = {} |
|
for p, distance in zip(ids, distances): |
|
id = ANNOY_INDEX[p] |
|
if id in persons: |
|
persons[id]["hits"] += 1 |
|
persons[id]["distance"] -= 0.5 |
|
persons[id]["confidence"] = normalize_confidence_from_distance( |
|
persons[id]["distance"], threshold |
|
) |
|
continue |
|
|
|
persons[id] = { |
|
"id": id, |
|
"distance": round(distance, 2), |
|
"confidence": normalize_confidence_from_distance(distance, threshold), |
|
"hits": 1, |
|
} |
|
|
|
if id in PERFORMER_DB: |
|
persons[id].update(PERFORMER_DB.get(id)) |
|
|
|
persons = sorted(persons.values(), key=lambda x: x["distance"]) |
|
persons = [p for p in persons if p["distance"] < threshold] |
|
return persons[:results] |
|
|
|
|
|
def normalize_confidence_from_distance(distance, threshold=20.0): |
|
"""Normalize confidence to 0-100 scale""" |
|
confidence = face_distance_to_conf(distance, threshold) |
|
return int(((confidence - 0.0) / (1.0 - 0.0)) * (100.0 - 0.0) + 0.0) |
|
|
|
|
|
def face_distance_to_conf(face_distance, face_match_threshold=20.0): |
|
"""Using a face distance, calculate a similarity confidence value""" |
|
if face_distance > face_match_threshold: |
|
|
|
range = 1.0 - face_match_threshold |
|
linear_val = (1.0 - face_distance) / (range * 2.0) |
|
return linear_val |
|
else: |
|
|
|
range = face_match_threshold |
|
linear_val = 1.0 - (face_distance / (range * 2.0)) |
|
|
|
|
|
|
|
return linear_val + ((1.0 - linear_val) * math.pow((linear_val - 0.5) * 2, 0.2)) |
|
|
|
|
|
def predict(image, vtt): |
|
vtt = base64.b64decode(vtt.replace("data:text/vtt;base64,", "")) |
|
sprite = PILImage.create(image) |
|
|
|
pre_process_data = [] |
|
for left, top, right, bottom in getVTToffsets(vtt): |
|
cut_frame = sprite.crop((left, top, left + right, top + bottom)) |
|
image = n2.preprocess_image(cut_frame, n2.Preprocessing.YAHOO) |
|
pre_process_data.append( |
|
(np.expand_dims(image, axis=0), cut_frame, (left, top, right, bottom)) |
|
) |
|
|
|
offsets = [] |
|
images = [] |
|
tensors = [i[0] for i in pre_process_data] |
|
predictions = yahooNsfwModel.predict(np.vstack(tensors)) |
|
for i, prediction in enumerate(predictions): |
|
if prediction[0] < 0.5: |
|
images.append(PILImage.create(np.asarray(pre_process_data[i][1]))) |
|
offsets.append(pre_process_data[i][2]) |
|
|
|
persons = {} |
|
for image in images: |
|
personList = image_search_performer(image) |
|
for person in personList: |
|
person_id = person["id"] |
|
if person_id not in persons: |
|
persons[person_id] = person |
|
else: |
|
existing_person = persons[person_id] |
|
existing_person["hits"] += person["hits"] |
|
if person["distance"] < existing_person["distance"]: |
|
existing_person["distance"] = person["distance"] |
|
if person["confidence"] > existing_person["confidence"]: |
|
existing_person["confidence"] = person["confidence"] |
|
|
|
return persons |
|
|
|
def getVTToffsets(vtt): |
|
left = top = right = bottom = None |
|
for line in vtt.decode("utf-8").split("\n"): |
|
line = line.strip() |
|
if "xywh=" in line: |
|
left, top, right, bottom = line.split("xywh=")[-1].split(",") |
|
left, top, right, bottom = ( |
|
int(left), |
|
int(top), |
|
int(right), |
|
int(bottom), |
|
) |
|
else: |
|
continue |
|
|
|
if not left: |
|
continue |
|
|
|
yield left, top, right, bottom |
|
|
|
|
|
image_search = gr.Interface( |
|
fn=image_search_performer, |
|
inputs=[ |
|
gr.components.Image(), |
|
gr.components.Slider(label="threshold", minimum=0.0, maximum=30.0, value=20.0), |
|
gr.components.Slider(label="results", minimum=0, maximum=50, value=3, step=1), |
|
], |
|
outputs=gr.outputs.JSON(label=""), |
|
title="Who is in the photo?", |
|
description="Upload an image of a person and we'll tell you who it is.", |
|
) |
|
|
|
sprite_search = gr.Interface( |
|
fn=predict, |
|
inputs=[ |
|
gr.Image(), |
|
gr.Textbox(label="VTT file"), |
|
], |
|
outputs=gr.JSON(label=""), |
|
).launch(enable_queue=True, server_name="0.0.0.0") |
|
|
|
gr.TabbedInterface([image_search, sprite_search]).launch( |
|
enable_queue=True, server_name="0.0.0.0" |
|
) |
|
|