Deepface_detect / App /app.py
Vinh.Vu
Update Dense to accept quantization_config from newer Keras
0164c52
import os
import base64
import math
import logging
import subprocess
import cv2
import numpy as np
import imageio_ffmpeg
from mtcnn import MTCNN
from ultralytics import YOLO
from flask import Flask
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.efficientnet import preprocess_input
import keras.src.layers.normalization.batch_normalization as _bn_module
import keras.src.layers.core.dense as _dense_module
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
stream=sys.stderr
)
# Monkey-patch BatchNormalization to accept legacy renorm kwargs
_OrigBN = _bn_module.BatchNormalization
_orig_bn_init = _OrigBN.__init__
def _patched_bn_init(self, *args, **kwargs):
kwargs.pop('renorm', None)
kwargs.pop('renorm_clipping', None)
kwargs.pop('renorm_momentum', None)
_orig_bn_init(self, *args, **kwargs)
_OrigBN.__init__ = _patched_bn_init
# Monkey-patch Dense to accept quantization_config from newer Keras
_OrigDense = _dense_module.Dense
_orig_dense_init = _OrigDense.__init__
def _patched_dense_init(self, *args, **kwargs):
kwargs.pop('quantization_config', None)
_orig_dense_init(self, *args, **kwargs)
_OrigDense.__init__ = _patched_dense_init
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(__file__), 'uploads')
app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get('MAX_UPLOAD_MB', 50)) * 1024 * 1024
ALLOWED_EXTENSIONS = {'mp4', 'avi', 'mov', 'mkv', 'wmv'}
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Load the trained model
MODEL_PATH = os.environ.get(
'MODEL_PATH',
os.path.join(os.path.dirname(__file__), 'models', 'best_model.keras')
)
if not os.path.exists(MODEL_PATH):
# Fallback for local development
MODEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'tmp_checkpoint', 'best_model.keras')
logger.info('Loading model from %s', MODEL_PATH)
model = load_model(MODEL_PATH)
logger.info('Model loaded successfully')
INPUT_SIZE = 224
MIN_FACE_SIZE = 90 # same as 02-prepare_fake_real_dataset.py
# Initialize MTCNN face detector (same as training pipeline 01-crop_faces_with_mtcnn.py)
logger.info('Initializing MTCNN face detector')
mtcnn_detector = MTCNN()
logger.info('MTCNN face detector ready')
# Initialize YOLO face detector (for processed video overlay only)
logger.info('Initializing YOLO face detector')
FACE_MODEL_PATH = os.path.join(os.path.dirname(__file__), 'yolov8n-face.pt')
face_detector = YOLO(FACE_MODEL_PATH)
logger.info('YOLO face detector ready')
# In-memory job store: job_id -> {status, result, ...}
jobs = {}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def face_to_base64(face_rgb):
face_bgr = cv2.cvtColor(face_rgb, cv2.COLOR_RGB2BGR)
_, buffer = cv2.imencode('.png', face_bgr)
return base64.b64encode(buffer).decode('utf-8')
def reencode_to_h264(input_path, output_path=None):
"""Re-encode a video to H.264 for browser compatibility. Overwrites in-place if no output_path."""
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
if output_path is None:
output_path = input_path
tmp = input_path + '.reencode.mp4'
cmd = [
ffmpeg_exe, '-y', '-i', input_path,
'-c:v', 'libx264', '-preset', 'fast',
'-movflags', '+faststart', '-pix_fmt', 'yuv420p',
tmp
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error('ffmpeg reencode failed: %s', result.stderr)
try:
os.remove(tmp)
except OSError:
pass
return False
try:
os.replace(tmp, output_path)
except OSError:
os.remove(input_path)
os.rename(tmp, output_path)
return True
def scale_frame(frame):
"""Scale frame exactly like 00-convert_video_to_image.py"""
h, w = frame.shape[:2]
if w < 300:
scale_ratio = 2
elif w > 1900:
scale_ratio = 0.33
elif w > 1000:
scale_ratio = 0.5
else:
scale_ratio = 1
if scale_ratio != 1:
new_w = int(w * scale_ratio)
new_h = int(h * scale_ratio)
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
return frame
def extract_faces_from_video(video_path):
"""Extract faces using MTCNN — matching training pipeline (01-crop_faces_with_mtcnn.py)."""
logger.info('Extracting faces from video: %s', video_path)
faces = []
cap = cv2.VideoCapture(video_path)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
if frame_rate == 0:
logger.warning('Could not read frame rate from video')
cap.release()
return faces
while cap.isOpened():
frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES)
ret, frame = cap.read()
if not ret:
break
if frame_id % math.floor(frame_rate) == 0:
# Step 1: Scale frame (same as 00-convert_video_to_image.py)
frame = scale_frame(frame)
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w = image_rgb.shape[:2]
# Step 2: MTCNN face detection (same as 01-crop_faces_with_mtcnn.py)
results = mtcnn_detector.detect_faces(image_rgb)
num_faces = len(results)
for result in results:
bounding_box = result['box']
confidence = result['confidence']
# Same logic as training: if single face keep it, if multiple only keep > 0.95
if num_faces < 2 or confidence > 0.95:
bx, by, bw, bh = bounding_box
margin_x = bw * 0.3
margin_y = bh * 0.3
x1 = int(max(0, bx - margin_x))
x2 = int(min(w, bx + bw + margin_x))
y1 = int(max(0, by - margin_y))
y2 = int(min(h, by + bh + margin_y))
crop = image_rgb[y1:y2, x1:x2]
# Step 3: Filter small faces (same as 02-prepare_fake_real_dataset.py MIN_IMAGE_SIZE=90)
if crop.shape[0] < MIN_FACE_SIZE or crop.shape[1] < MIN_FACE_SIZE:
continue
if crop.size > 0:
crop_resized = cv2.resize(crop, (INPUT_SIZE, INPUT_SIZE))
faces.append(crop_resized)
cap.release()
logger.info('Face extraction complete — %d faces found', len(faces))
return faces
def create_processed_video(video_path, output_path, face_scores=None):
"""Create video with face bounding boxes using ffmpeg drawbox (much faster than OpenCV)."""
logger.info('Creating processed video with bounding boxes: %s', output_path)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
# Sample a few frames spread across the video to detect faces
sample_count = min(5, max(1, int(duration))) # ~1 sample per second, max 5
sample_positions = [int(i * total_frames / sample_count) for i in range(sample_count)]
# Collect all face boxes across sampled frames
all_boxes = []
for pos in sample_positions:
cap.set(cv2.CAP_PROP_POS_FRAMES, pos)
ret, frame = cap.read()
if not ret:
continue
results = face_detector(frame, verbose=False)[0]
for box in results.boxes:
if box.conf[0] > 0.5:
bx1, by1, bx2, by2 = map(int, box.xyxy[0])
all_boxes.append((max(0, bx1), max(0, by1), bx2, by2))
cap.release()
# Build ffmpeg drawbox filter from detected boxes
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
if all_boxes:
# Use the most common box region (largest by area) for a stable overlay
# Deduplicate similar boxes by averaging nearby ones
unique_boxes = []
for box in all_boxes:
merged = False
for i, ub in enumerate(unique_boxes):
# If boxes overlap significantly, merge them
if (abs(box[0] - ub[0]) < 40 and abs(box[1] - ub[1]) < 40 and
abs(box[2] - ub[2]) < 40 and abs(box[3] - ub[3]) < 40):
unique_boxes[i] = (
(ub[0] + box[0]) // 2, (ub[1] + box[1]) // 2,
(ub[2] + box[2]) // 2, (ub[3] + box[3]) // 2
)
merged = True
break
if not merged:
unique_boxes.append(box)
drawbox_filters = []
for (x1, y1, x2, y2) in unique_boxes:
w = x2 - x1
h = y2 - y1
drawbox_filters.append(f"drawbox=x={x1}:y={y1}:w={w}:h={h}:color=green:t=2")
filter_str = ','.join(drawbox_filters)
else:
filter_str = 'null'
cmd = [
ffmpeg_exe, '-y', '-i', video_path,
'-vf', filter_str,
'-c:v', 'libx264', '-preset', 'fast',
'-movflags', '+faststart', '-pix_fmt', 'yuv420p',
output_path
]
logger.info('Running ffmpeg with %d face boxes', len(all_boxes))
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error('ffmpeg drawbox failed: %s', result.stderr[-500:])
else:
logger.info('Processed video saved: %s', output_path)
def predict_deepfake(faces):
if not faces:
logger.warning('No faces to predict on')
return None, 0, []
logger.info('Running prediction on %d face(s)', len(faces))
face_array = preprocess_input(np.array(faces, dtype='float32'))
predictions = model.predict(face_array, verbose=0)
flat_preds = predictions.flatten()
# Use top-K mean: average the top 30% of predictions (at least 3)
# Rationale: real videos have many high-confidence real frames; fake videos have NONE
sorted_desc = np.sort(flat_preds)[::-1] # highest first
k = max(3, int(len(sorted_desc) * 0.3))
top_k = sorted_desc[:k]
avg_prediction = float(np.mean(top_k))
# Write diagnostics to file
diag_path = os.path.join(os.path.dirname(__file__), 'diag_log.txt')
with open(diag_path, 'a') as f:
f.write(f'Raw predictions: min={float(np.min(predictions)):.4f}, max={float(np.max(predictions)):.4f}, top{k}_mean={avg_prediction:.4f}, mean={float(np.mean(predictions)):.4f}\n')
f.write(f'All scores (sorted desc): {sorted_desc.tolist()}\n')
f.write(f'Top-{k} used: {top_k.tolist()}\n')
f.write(f'Num faces: {len(faces)}\n\n')
logger.info('Raw predictions: min=%.4f, max=%.4f, top%d_mean=%.4f, mean=%.4f, n=%d',
float(np.min(predictions)), float(np.max(predictions)),
k, avg_prediction, float(np.mean(flat_preds)), len(flat_preds))
# Build per-face details (up to 5 faces sorted by relevance)
is_real = avg_prediction > 0.5
# Sort face indices by score: highest first for REAL, lowest first for FAKE
sorted_indices = np.argsort(flat_preds)[::-1] if is_real else np.argsort(flat_preds)
indices = sorted_indices[:5].tolist()
faces_detail = []
for i in indices:
faces_detail.append({
'thumbnail': face_to_base64(faces[i]),
'score': float(predictions[i][0])
})
logger.info('Prediction complete — avg score: %.4f, faces: %d', avg_prediction, len(faces))
return avg_prediction, len(faces), faces_detail
def cleanup_old_uploads(exclude=None):
"""Delete all files in the upload folder except those in exclude."""
exclude = set(exclude or [])
folder = app.config['UPLOAD_FOLDER']
for f in os.listdir(folder):
fpath = os.path.join(folder, f)
if os.path.isfile(fpath) and fpath not in exclude:
try:
os.remove(fpath)
except PermissionError:
pass
from route import routes
app.register_blueprint(routes)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
logger.info('Starting Flask server on http://0.0.0.0:%d', port)
app.run(debug=False, host='0.0.0.0', port=port)