import subprocess subprocess.run(["sh", "tddfa/build.sh"]) import gradio as gr from gradio.components import Dropdown import cv2 as cv import torch from torchvision import transforms from DeePixBiS.Model import DeePixBiS import yaml import numpy as np import pandas as pd from skimage.io import imread, imsave # from tddfa.TDDFA import TDDFA from tddfa.utils.depth import depth from tddfa.TDDFA_ONNX import TDDFA_ONNX import torch.optim as optim from DSDG.DUM.models.CDCNs_u import Conv2d_cd, CDCN_u import io import uuid import numpy as np from PIL import Image import boto3 from utils.blur_filter import filter_frames import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' os.environ['OMP_NUM_THREADS'] = '4' app_version = 'dsdg_vid_3' device = torch.device("cpu") labels = ['Live', 'Spoof'] PIX_THRESHOLD = 0.45 DSDG_THRESHOLD = 80.0 DSDG_FACTOR = 1000000 DSDG_PERCENTILE = 40 MIN_FACE_WIDTH_THRESHOLD = 210 examples = [ ['examples/1_1_21_2_33_scene_fake.jpg'], ['examples/frame150_real.jpg'], ['examples/1_2.avi_125_real.jpg'], ['examples/1_3.avi_25_fake.jpg']] faceClassifier = cv.CascadeClassifier('./DeePixBiS/Classifiers/haarface.xml') tfms = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]) # deepix_model = DeePixBiS(pretrained=False) # deepix_model.load_state_dict(torch.load('./DeePixBiS/DeePixBiS.pth')) # deepix_model.eval() depth_config_path = 'tddfa/configs/mb1_120x120.yml' # 'tddfa/configs/mb1_120x120.yml cfg = yaml.load(open(depth_config_path), Loader=yaml.SafeLoader) tddfa = TDDFA_ONNX(gpu_mode=False, **cfg) cdcn_model = CDCN_u(basic_conv=Conv2d_cd, theta=0.7) cdcn_model = cdcn_model.to(device) weights = torch.load('./DSDG/DUM/checkpoint/CDCN_U_P1_updated.pkl', map_location=device) cdcn_model.load_state_dict(weights) optimizer = optim.Adam(cdcn_model.parameters(), lr=0.001, weight_decay=0.00005) cdcn_model.eval() class Normaliztion_valtest(object): """ same as mxnet, normalize into [-1, 1] image = (image - 127.5)/128 """ def __call__(self, image_x): image_x = (image_x - 127.5) / 128 # [-1,1] return image_x def find_largest_face(faces): # find the largest face in the list largest_face = None largest_area = 0 for face in faces: x, y, w, h = face area = w * h if area > largest_area: largest_area = area largest_face = face return largest_face def extract_face(img): face = None if img is None: return face grey = cv.cvtColor(img, cv.COLOR_BGR2GRAY) faces = faceClassifier.detectMultiScale( grey, scaleFactor=1.1, minNeighbors=4) if len(faces): face = find_largest_face(faces) return face def deepix_model_inference(img, bbox): x, y, x2, y2 = bbox faceRegion = img[y:y2, x:x2] faceRegion = tfms(faceRegion) faceRegion = faceRegion.unsqueeze(0) mask, binary = deepix_model.forward(faceRegion) res_deepix = torch.mean(mask).item() cls_deepix = 'Real' if res_deepix >= PIX_THRESHOLD else 'Spoof' confidences_deepix = {'Real confidence': res_deepix} color_deepix = (0, 255, 0) if cls_deepix == 'Real' else (255, 0, 0) img_deepix = cv.rectangle(img.copy(), (x, y), (x2, y2), color_deepix, 2) cv.putText(img_deepix, cls_deepix, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_deepix) cls_deepix = 1 if cls_deepix == 'Real' else 0 return img_deepix, confidences_deepix, cls_deepix def get_depth_img(img, bbox): bbox_conf = list(bbox) bbox_conf.append(1) param_lst, roi_box_lst = tddfa(img, [bbox_conf]) ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=True) depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False) return depth_img def analyze_face(img): face = extract_face(img) if face is None: return img, (), None x, y, w, h = face x2 = x + w y2 = y + h bbox = (x, y, x2, y2) if w < MIN_FACE_WIDTH_THRESHOLD: color_dsdg = (0, 0, 0) text = f'Small res ({w}*{h})' cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) # cls_dsdg = -1 return img, bbox, None depth_img = get_depth_img(img, bbox) return img, bbox, depth_img def prepare_data_dsdg(images, boxes, depths): transform = transforms.Compose([Normaliztion_valtest()]) files_total = len(images) image_x = np.zeros((files_total, 256, 256, 3)) depth_x = np.ones((files_total, 32, 32)) for i, (image, bbox, depth_img) in enumerate( zip(images, boxes, depths)): x, y, x2, y2 = bbox depth_img = cv.cvtColor(depth_img, cv.COLOR_BGR2GRAY) image = image[y:y2, x:x2] depth_img = depth_img[y:y2, x:x2] image_x[i, :, :, :] = cv.resize(image, (256, 256)) # transform to binary mask --> threshold = 0 depth_x[i, :, :] = cv.resize(depth_img, (32, 32)) image_x = image_x.transpose((0, 3, 1, 2)) image_x = transform(image_x) image_x = torch.from_numpy(image_x.astype(float)).float() depth_x = torch.from_numpy(depth_x.astype(float)).float() return image_x, depth_x def dsdg_model_inference(imgs, bboxes, depth_imgs): with torch.no_grad(): map_score_list = [] image_x, map_x = prepare_data_dsdg(imgs, bboxes, depth_imgs) # get the inputs image_x = image_x.unsqueeze(0) map_x = map_x.unsqueeze(0) inputs = image_x.to(device) test_maps = map_x.to(device) optimizer.zero_grad() scores = [] map_score = 0.0 for frame_t in range(inputs.shape[1]): mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :]) score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :]) score = score_norm.item() if score > 10: score = 0.0 scores.append(score * DSDG_FACTOR) map_score += score_norm return scores def inference(img, dsdg_thresh): face = extract_face(img) if face is not None: x, y, w, h = face x2 = x + w y2 = y + h bbox = (x, y, x2, y2) # img_deepix, confidences_deepix, cls_deepix = deepix_model_inference(img, bbox) img_dsdg, confidences_dsdg, cls_dsdg = dsdg_model_inference(img, bbox, dsdg_thresh) return img, {}, 2, img_dsdg, confidences_dsdg, cls_dsdg else: return img, {}, None, img, {}, None def process_video(vid_path, dsdg_thresh): cap = cv.VideoCapture(vid_path) input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) most_focused = filter_frames(cap) inference_images = [] inference_bboxes = [] inference_depths = [] for frame in most_focused: # Run inference on the current frame img, bbox, depth_img = analyze_face(frame) if bbox and (depth_img is not None): inference_images.append(img) inference_bboxes.append(bbox) inference_depths.append(depth_img) if not inference_images: return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1 scores = dsdg_model_inference(inference_images, inference_bboxes, inference_depths) res_dsdg = np.percentile(scores, DSDG_PERCENTILE) cls_dsdg = 'Real' if res_dsdg >= dsdg_thresh else 'Spoof' for img, bbox, score in zip(inference_images, inference_bboxes, scores): x, y, x2, y2 = bbox w = x2 - x h = y2 - y frame_cls = 'Real' if score >= dsdg_thresh else 'Spoof' color_dsdg = (0, 255, 0) if frame_cls == 'Real' else (0, 0, 255) text = f'{cls_dsdg} {w}*{h}' cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) fourcc = cv.VideoWriter_fourcc(*'mp4v') output_vid_path = 'output_dsdg.mp4' out_dsdg = cv.VideoWriter(output_vid_path, fourcc, 6.0, (input_width, input_height)) for img in most_focused: # Write the DSDG frame to the output video out_dsdg.write(img) out_dsdg.release() text_dsdg = f'Label: {cls_dsdg}, average real confidence: {res_dsdg}\nFrames used: {len(scores)}\nConfidences: {scores}' return vid_path, {'Not supported right now': 0}, -1, output_vid_path, text_dsdg, res_dsdg def upload_to_s3(vid_path, app_version, *labels): folder = 'demo' bucket_name = 'livenessng' if vid_path is None: return 'Error. Take a photo first.' elif labels[-2] == -2: return 'Error. Run the detection first.' elif labels[0] is None: return 'Error. Select the true label first.' elif labels[0] == 2: labels[0] = -1 # Initialize S3 client s3 = boto3.client('s3') # Encode labels and app version in video file name encoded_labels = '_'.join([str(int(label)) for label in labels]) random_string = str(uuid.uuid4()).split('-')[-1] video_name = f"{folder}/{app_version}/{encoded_labels}_{random_string}.mp4" # Upload video to S3 with open(vid_path, 'rb') as video_file: res = s3.upload_fileobj(video_file, bucket_name, video_name) # Return the S3 URL of the uploaded video status = 'Successfully uploaded' return status demo = gr.Blocks() with demo: with gr.Row(): with gr.Column(): input_vid = gr.Video(format='mp4', source='webcam') dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=300, step=5) btn_run = gr.Button(value="Run") with gr.Column(): outputs=[ gr.Video(label='DeePixBiS', format='mp4'), gr.Label(num_top_classes=2, label='DeePixBiS'), gr.Number(visible=False, value=-2), gr.Video(label='DSDG', format='mp4'), gr.Textbox(label='DSDG'), gr.Number(visible=False, value=-2)] with gr.Column(): radio = gr.Radio( ["Spoof", "Real", "None"], label="True label", type='index') flag = gr.Button(value="Flag") status = gr.Textbox() # example_block = gr.Examples(examples, [input_vid], outputs) btn_run.click(process_video, [input_vid, dsdg_thresh], outputs) app_version_block = gr.Textbox(value=app_version, visible=False) flag.click( upload_to_s3, [input_vid, app_version_block, radio]+[outputs[2], outputs[5]], [status], show_progress=True) if __name__ == '__main__': demo.queue(concurrency_count=2) demo.launch(share=False)