Spaces:
Sleeping
Sleeping
import sys, os | |
import numpy as np | |
import cv2 | |
sys.path.append(os.getcwd()) | |
from face_detection import FaceDetection | |
from deformation_detection import DeformationDetection | |
from deepfake_detection import DeepfakeDetection | |
from utils import readb64, img2base64, binary2video, extract_frames | |
class FaceFakePipelineImage: | |
def __init__(self, device='cpu', gpu_id=0, weights='weights/model_params_ffpp_c23.pickle'): | |
self.face_detection = FaceDetection(batch_size=64) | |
self.deformation_detection = DeformationDetection(batch_size=64) | |
self.deepfake_detection = DeepfakeDetection(batch_size=64) | |
def preprocess(self, data): | |
image_base64 = data.pop("images", data) | |
if not type(image_base64) == list: | |
image_base64 = [image_base64] | |
elif len(image_base64) > 1: | |
raise Exception("FaceFakePipelineImage only accepts 1 image/frame") | |
images = [readb64(image) for image in image_base64] | |
return images | |
def inference(self, images, config_payload): | |
frame_detections = self.face_detection(images, confidence_threshold=config_payload['face_detection_threshold']) | |
if len(frame_detections) == 0: | |
return {'results': [], | |
'message': 'No face detected'} | |
# only process 1 frame | |
frame_detections = frame_detections[0] | |
deform_labels, deform_probs, heatmaps = self.deformation_detection( | |
frame_detections['face_images'], | |
confidence_threshold=config_payload['deformation_detection_threshold'], | |
return_heatmap=True | |
) | |
if 'fake' in deform_labels: | |
results = [] | |
for i, image in enumerate(frame_detections['face_images']): | |
results.append({ | |
'deformation_assessment': { | |
'is_fake': True if deform_labels[i] == 'fake' else False, | |
'fake_score': deform_probs[i], | |
'heatmap': img2base64(heatmaps[i]), | |
}, | |
'deepfake_assesment': None, | |
'bounding_box': frame_detections['face_bbox'][i], | |
}) | |
return {'results': results, | |
'message': 'It likely detects Fake'} | |
# if none of detected faces are fake, proceeed to deepfake detection as well | |
deepfake_labels, deepfake_probs = self.deepfake_detection( | |
frame_detections['face_images'], | |
confidence_threshold=config_payload['deepfake_detection_threshold'], | |
) | |
results = [] | |
for i, image in enumerate(frame_detections['face_images']): | |
results.append({ | |
'deformation_assessment': { | |
'is_fake': True if deform_labels[i] == 'fake' else False, | |
'fake_score': deform_probs[i], | |
'heatmap': img2base64(heatmaps[i]), | |
}, | |
'deepfake_assesment': { | |
'is_fake': True if deepfake_labels[i] == 'fake' else False, | |
'fake_score': deepfake_probs[i], | |
}, | |
'bounding_box': frame_detections['face_bbox'][i] | |
}) | |
if 'fake' in deepfake_labels: | |
return {'results': results, | |
'message': 'It likely detects Fake'} | |
# If none are fake | |
return {'results': results, | |
'message': 'It likely detects Real'} | |
def get_response(self, inference_result): | |
response = { | |
'job':{ | |
'result': { | |
'status': 'success', | |
'analytic_type': 'FAKE_DETECTION', | |
'results': inference_result['results'] | |
} | |
}, | |
'message': inference_result['message'], | |
'ok': True, | |
} | |
return response | |
def __call__(self, data, config_payload): | |
images = self.preprocess(data) | |
inference_result = self.inference(images, config_payload) | |
response = self.get_response(inference_result) | |
return response | |
class FaceFakePipelineVideo: | |
def __init__(self, device='cpu', gpu_id=0, weights='weights/model_params_ffpp_c23.pickle'): | |
self.face_detection = FaceDetection(batch_size=1) | |
self.deepfake_detection = DeepfakeDetection(batch_size=64) | |
def preprocess(self, video_path, config_payload): | |
return extract_frames( | |
video_path, | |
interval=config_payload['frame_sampling_interval'], | |
max_frames=config_payload['frame_sampling_max'] | |
) | |
def inference(self, images, config_payload): | |
frame_detections = self.face_detection(images, confidence_threshold=config_payload['face_detection_threshold']) | |
if len(frame_detections) == 0: | |
return {'results': [], | |
'message': 'No face detected'} | |
overal_probs = [] | |
for frame in frame_detections: | |
deepfake_labels, deepfake_probs = self.deepfake_detection( | |
frame['face_images'], | |
confidence_threshold=config_payload['deepfake_detection_threshold'], | |
) | |
# Pick highest face fakeness as frame fake probability | |
frame_prob = max(deepfake_probs) | |
overal_probs.append(frame_prob) | |
overal_prob = np.mean(overal_probs) | |
overal_label = True if overal_prob >= config_payload['deepfake_detection_threshold'] else False | |
message = 'It likely detects Fake' if overal_label else 'It likely detects Real' | |
return { | |
'results': [{ | |
'deformation_assessment': None, | |
'deepfake_assesment': { | |
'is_fake': overal_label, | |
'fake_score': overal_prob, | |
}, | |
'bounding_box': None, | |
}], | |
'message': message | |
} | |
def get_response(self, inference_result): | |
response = { | |
'job':{ | |
'result': { | |
'status': 'success', | |
'analytic_type': 'FAKE_DETECTION', | |
'results': inference_result['results'] | |
} | |
}, | |
'message': inference_result['message'], | |
'ok': True, | |
} | |
return response | |
def __call__(self, video_path, config_payload): | |
images = self.preprocess(video_path, config_payload) | |
inference_result = self.inference(images, config_payload) | |
response = self.get_response(inference_result) | |
return response | |
def image_test(): | |
# init pipeline | |
pipeline = FaceFakePipelineImage() | |
config_payload = { | |
'face_detection_threshold': 0.997, | |
'deformation_detection_threshold': 0.6, | |
'deepfake_detection_threshold': 0.65, | |
} | |
img_base64_wefie = img2base64(cv2.imread('sample_files/wefie.jpg')) | |
payload = {'images': [img_base64_wefie]} | |
pred = pipeline(payload, config_payload) | |
# show results | |
from pprint import pprint | |
pprint(pred) | |
def video_test(): | |
video_pipeline = FaceFakePipelineVideo() | |
config_payload = { | |
'face_detection_threshold': 0.997, | |
'deepfake_detection_threshold': 0.65, | |
'frame_sampling_interval': 60, | |
'frame_sampling_max': 50, | |
} | |
pred = video_pipeline('sample_files\messi_deepfake.mp4', config_payload) | |
from pprint import pprint | |
pprint(pred) | |
if __name__ == "__main__": | |
image_test() | |
video_test() | |