import subprocess |
subprocess.run(["sh", "tddfa/build.sh"]) |
import gradio as gr |
from gradio.components import Dropdown |
import cv2 as cv |
import torch |
from torchvision import transforms |
from DeePixBiS.Model import DeePixBiS |
import yaml |
import numpy as np |
import pandas as pd |
from skimage.io import imread, imsave |
from tddfa.utils.depth import depth |
from tddfa.TDDFA_ONNX import TDDFA_ONNX |
import torch.optim as optim |
from DSDG.DUM.models.CDCNs_u import Conv2d_cd, CDCN_u |
import io |
import uuid |
import numpy as np |
from PIL import Image |
import boto3 |
import os |
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
os.environ['OMP_NUM_THREADS'] = '4' |
os.environ['AWS_SECRET_ACCESS_KEY'] = 'lHf9xIwdgO3eXrE9a4KL+BTJ7af2cgZJYRRxw4NI' |
app_version = 'dsdg_vid_2' |
device = torch.device("cpu") |
labels = ['Live', 'Spoof'] |
DSDG_FACTOR = 1000000 |
examples = [ |
['examples/1_1_21_2_33_scene_fake.jpg'], |
['examples/frame150_real.jpg'], |
['examples/1_2.avi_125_real.jpg'], |
['examples/1_3.avi_25_fake.jpg']] |
faceClassifier = cv.CascadeClassifier('./DeePixBiS/Classifiers/haarface.xml') |
tfms = transforms.Compose([ |
transforms.ToPILImage(), |
transforms.Resize((224, 224)), |
transforms.ToTensor(), |
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) |
]) |
deepix_model = DeePixBiS(pretrained=False) |
deepix_model.load_state_dict(torch.load('./DeePixBiS/DeePixBiS.pth')) |
deepix_model.eval() |
depth_config_path = 'tddfa/configs/mb1_120x120.yml' |
cfg = yaml.load(open(depth_config_path), Loader=yaml.SafeLoader) |
tddfa = TDDFA_ONNX(gpu_mode=False, **cfg) |
cdcn_model = CDCN_u(basic_conv=Conv2d_cd, theta=0.7) |
cdcn_model = cdcn_model.to(device) |
weights = torch.load('./DSDG/DUM/checkpoint/CDCN_U_P1_updated.pkl', map_location=device) |
cdcn_model.load_state_dict(weights) |
optimizer = optim.Adam(cdcn_model.parameters(), lr=0.001, weight_decay=0.00005) |
cdcn_model.eval() |
class Normaliztion_valtest(object): |
""" |
same as mxnet, normalize into [-1, 1] |
image = (image - 127.5)/128 |
""" |
def __call__(self, image_x): |
image_x = (image_x - 127.5) / 128 |
return image_x |
def find_largest_face(faces): |
largest_face = None |
largest_area = 0 |
for face in faces: |
x, y, w, h = face |
area = w * h |
if area > largest_area: |
largest_area = area |
largest_face = face |
return largest_face |
def extract_face(img): |
face = None |
if img is None: |
return face |
grey = cv.cvtColor(img, cv.COLOR_RGB2GRAY) |
faces = faceClassifier.detectMultiScale( |
grey, scaleFactor=1.1, minNeighbors=4) |
if len(faces): |
face = find_largest_face(faces) |
return face |
def deepix_model_inference(img, bbox): |
x, y, x2, y2 = bbox |
faceRegion = img[y:y2, x:x2] |
faceRegion = tfms(faceRegion) |
faceRegion = faceRegion.unsqueeze(0) |
mask, binary = deepix_model.forward(faceRegion) |
res_deepix = torch.mean(mask).item() |
cls_deepix = 'Real' if res_deepix >= PIX_THRESHOLD else 'Spoof' |
confidences_deepix = {'Real confidence': res_deepix} |
color_deepix = (0, 255, 0) if cls_deepix == 'Real' else (255, 0, 0) |
img_deepix = cv.rectangle(img.copy(), (x, y), (x2, y2), color_deepix, 2) |
cv.putText(img_deepix, cls_deepix, (x, y2 + 30), |
cv.FONT_HERSHEY_COMPLEX, 1, color_deepix) |
cls_deepix = 1 if cls_deepix == 'Real' else 0 |
return img_deepix, confidences_deepix, cls_deepix |
def get_depth_img(img, bbox): |
bbox_conf = list(bbox) |
bbox_conf.append(1) |
param_lst, roi_box_lst = tddfa(img, [bbox_conf]) |
ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=True) |
depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False) |
return depth_img |
def analyze_face(img): |
face = extract_face(img) |
if face is None: |
return img, (), None |
x, y, w, h = face |
x2 = x + w |
y2 = y + h |
bbox = (x, y, x2, y2) |
img_dsdg = img.copy() |
color_dsdg = (0, 0, 0) |
text = f'Small res ({w}*{h})' |
img_dsdg = cv.rectangle(img_dsdg, (x, y), (x2, y2), color_dsdg, 2) |
cv.putText(img_dsdg, text, (x, y2 + 30), |
cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
return img_dsdg, bbox, None |
depth_img = get_depth_img(img, bbox) |
return img_dsdg, bbox, depth_img |
def prepare_data_dsdg(images, boxes, depths): |
transform = transforms.Compose([Normaliztion_valtest()]) |
files_total = len(images) |
image_x = np.zeros((files_total, 256, 256, 3)) |
depth_x = np.ones((files_total, 32, 32)) |
for i, (image, bbox, depth_img) in enumerate( |
zip(images, boxes, depths)): |
x, y, x2, y2 = bbox |
depth_img = cv.cvtColor(depth_img, cv.COLOR_RGB2GRAY) |
image = image[y:y2, x:x2] |
depth_img = depth_img[y:y2, x:x2] |
image_x[i, :, :, :] = cv.resize(image, (256, 256)) |
depth_x[i, :, :] = cv.resize(depth_img, (32, 32)) |
image_x = image_x.transpose((0, 3, 1, 2)) |
image_x = transform(image_x) |
image_x = torch.from_numpy(image_x.astype(float)).float() |
depth_x = torch.from_numpy(depth_x.astype(float)).float() |
return image_x, depth_x |
def dsdg_model_inference(imgs, bboxes, depth_imgs): |
with torch.no_grad(): |
map_score_list = [] |
image_x, map_x = prepare_data_dsdg(imgs, bboxes, depth_imgs) |
image_x = image_x.unsqueeze(0) |
map_x = map_x.unsqueeze(0) |
inputs = image_x.to(device) |
test_maps = map_x.to(device) |
optimizer.zero_grad() |
scores = [] |
map_score = 0.0 |
for frame_t in range(inputs.shape[1]): |
mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :]) |
score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :]) |
scores.append(score_norm.item() * DSDG_FACTOR) |
map_score += score_norm |
map_score = map_score / inputs.shape[1] |
map_score_list.append(map_score) |
res_dsdg = map_score_list[0].item() |
if res_dsdg > 10: |
res_dsdg = 0.0 |
res_dsdg = res_dsdg * DSDG_FACTOR |
return res_dsdg, scores |
def inference(img, dsdg_thresh): |
face = extract_face(img) |
if face is not None: |
x, y, w, h = face |
x2 = x + w |
y2 = y + h |
bbox = (x, y, x2, y2) |
img_dsdg, confidences_dsdg, cls_dsdg = dsdg_model_inference(img, bbox, dsdg_thresh) |
return img, {}, 2, img_dsdg, confidences_dsdg, cls_dsdg |
else: |
return img, {}, None, img, {}, None |
def process_video(vid_path, dsdg_thresh): |
cap = cv.VideoCapture(vid_path) |
input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) |
input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) |
fourcc = cv.VideoWriter_fourcc(*'mp4v') |
output_vid_path = 'output_dsdg.mp4' |
frame_counter = 0 |
all_frames = [] |
inference_images = [] |
inference_bboxes = [] |
inference_depths = [] |
while cap.isOpened(): |
ret, frame = cap.read() |
if not ret: |
break |
if frame_counter % 5 == 0: |
frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB) |
img, bbox, depth_img = analyze_face(frame) |
if bbox and (depth_img is not None): |
inference_images.append(img) |
inference_bboxes.append(bbox) |
inference_depths.append(depth_img) |
all_frames.append(img) |
frame_counter += 1 |
cap.release() |
if not inference_images: |
return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1 |
res_dsdg, scores = dsdg_model_inference(inference_images, inference_bboxes, inference_depths) |
cls_dsdg = 'Real' if res_dsdg >= dsdg_thresh else 'Spoof' |
for img, bbox, score in zip(inference_images, inference_bboxes, scores): |
x, y, x2, y2 = bbox |
w = x2 - x |
h = y2 - y |
frame_cls = 'Real' if score >= dsdg_thresh else 'Spoof' |
color_dsdg = (0, 255, 0) if frame_cls == 'Real' else (255, 0, 0) |
text = f'{cls_dsdg} {w}*{h}' |
cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) |
cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
out_dsdg = cv.VideoWriter(output_vid_path, fourcc, 6.0, (input_width, input_height)) |
for img in all_frames: |
img_dsdg = cv.cvtColor(img, cv.COLOR_RGB2BGR) |
out_dsdg.write(img_dsdg) |
out_dsdg.release() |
text_dsdg = f'Label: {cls_dsdg}, average real confidence: {res_dsdg}\nFrames used: {len(scores)}\nConfidences: {scores}' |
return vid_path, {'Not supported right now': 0}, -1, output_vid_path, text_dsdg, res_dsdg |
def upload_to_s3(vid_path, app_version, *labels): |
folder = 'demo' |
bucket_name = 'livenessng' |
if vid_path is None: |
return 'Error. Take a photo first.' |
elif labels[-2] == -2: |
return 'Error. Run the detection first.' |
elif labels[0] is None: |
return 'Error. Select the true label first.' |
elif labels[0] == 2: |
labels[0] = -1 |
s3 = boto3.client('s3') |
encoded_labels = '_'.join([str(int(label)) for label in labels]) |
random_string = str(uuid.uuid4()).split('-')[-1] |
video_name = f"{folder}/{app_version}/{encoded_labels}_{random_string}.mp4" |
with open(vid_path, 'rb') as video_file: |
res = s3.upload_fileobj(video_file, bucket_name, video_name) |
status = 'Successfully uploaded' |
return status |
demo = gr.Blocks() |
with demo: |
with gr.Row(): |
with gr.Column(): |
input_vid = gr.Video(format='mp4', source='webcam') |
dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=300, step=5) |
btn_run = gr.Button(value="Run") |
with gr.Column(): |
outputs=[ |
gr.Video(label='DeePixBiS', format='mp4'), |
gr.Label(num_top_classes=2, label='DeePixBiS'), |
gr.Number(visible=False, value=-2), |
gr.Video(label='DSDG', format='mp4'), |
gr.Textbox(label='DSDG'), |
gr.Number(visible=False, value=-2)] |
with gr.Column(): |
radio = gr.Radio( |
["Spoof", "Real", "None"], label="True label", type='index') |
flag = gr.Button(value="Flag") |
status = gr.Textbox() |
btn_run.click(process_video, [input_vid, dsdg_thresh], outputs) |
app_version_block = gr.Textbox(value=app_version, visible=False) |
flag.click( |
upload_to_s3, |
[input_vid, app_version_block, radio]+[outputs[2], outputs[5]], |
[status], show_progress=True) |
if __name__ == '__main__': |
demo.queue(concurrency_count=2) |
demo.launch(share=False) |