|
import os |
|
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' |
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' |
|
|
|
import logging |
|
logging.getLogger('tensorflow').setLevel(logging.ERROR) |
|
|
|
import altair as alt |
|
import numpy as np |
|
import pandas as pd |
|
import streamlit as st |
|
import cv2 |
|
import torch |
|
from transformers import AutoImageProcessor, AutoModelForImageClassification |
|
from collections import deque |
|
import tensorflow as tf |
|
from tensorflow.keras.models import load_model |
|
import tempfile |
|
import time |
|
import urllib.request |
|
import shutil |
|
|
|
|
|
@st.cache_resource |
|
def load_cnn_model(): |
|
try: |
|
model = load_model('cnn_model.h5') |
|
st.success("CNN model loaded successfully!") |
|
return model |
|
except Exception as e: |
|
st.error(f"Error loading CNN model: {e}") |
|
st.warning("Please make sure 'cnn_model.h5' is in the current directory.") |
|
return None |
|
|
|
@st.cache_resource |
|
def load_vit_components(): |
|
image_processor = AutoImageProcessor.from_pretrained('Adieee5/deepfake-detection-f3net-cross', use_fast=True) |
|
model = AutoModelForImageClassification.from_pretrained('Adieee5/deepfake-detection-f3net-cross') |
|
return image_processor, model |
|
|
|
@st.cache_resource |
|
def load_face_net(): |
|
model_file = "deploy.prototxt" |
|
weights_file = "res10_300x300_ssd_iter_140000.caffemodel" |
|
if os.path.exists(model_file) and os.path.exists(weights_file): |
|
return cv2.dnn.readNetFromCaffe(model_file, weights_file) |
|
return None |
|
|
|
@st.cache_resource |
|
def load_haar_cascade(): |
|
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' |
|
if os.path.exists(cascade_path): |
|
return cv2.CascadeClassifier(cascade_path) |
|
return None |
|
|
|
class CNNDeepfakeDetector: |
|
def __init__(self): |
|
self.model = load_cnn_model() |
|
|
|
class DeepfakeDetector: |
|
def __init__(self): |
|
st.info("Initializing Deepfake Detector... This may take a moment.") |
|
|
|
|
|
with st.spinner("Loading deepfake detection model..."): |
|
self.image_processor, self.model = load_vit_components() |
|
|
|
|
|
with st.spinner("Loading face detection model..."): |
|
self.face_net = load_face_net() |
|
self.use_dnn = self.face_net is not None |
|
if self.use_dnn: |
|
st.success("Using DNN face detector (better for close-up faces)") |
|
else: |
|
self.face_cascade = load_haar_cascade() |
|
if self.face_cascade: |
|
st.warning("Using Haar cascade face detector as fallback") |
|
else: |
|
st.error(f"Cascade file not found") |
|
|
|
|
|
self.cnn_detector = CNNDeepfakeDetector() |
|
|
|
|
|
self.face_history = {} |
|
self.face_history_max_size = 10 |
|
self.face_ttl = 5 |
|
self.next_face_id = 0 |
|
self.result_buffer_size = 5 |
|
self.processing_times = deque(maxlen=30) |
|
|
|
st.success("Models loaded successfully!") |
|
|
|
def detect_faces_haar(self, frame): |
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) |
|
return [(x, y, w, h, 0.8) for (x, y, w, h) in faces] |
|
|
|
def detect_faces_dnn(self, frame): |
|
height, width = frame.shape[:2] |
|
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0)) |
|
self.face_net.setInput(blob) |
|
detections = self.face_net.forward() |
|
faces = [] |
|
for i in range(detections.shape[2]): |
|
confidence = detections[0, 0, i, 2] |
|
if confidence > 0.5: |
|
box = detections[0, 0, i, 3:7] * np.array([width, height, width, height]) |
|
(x1, y1, x2, y2) = box.astype("int") |
|
x1, y1 = max(0, x1), max(0, y1) |
|
x2, y2 = min(width, x2), min(height, y2) |
|
w, h = x2 - x1, y2 - y1 |
|
if w > 0 and h > 0: |
|
faces.append((x1, y1, w, h, confidence)) |
|
return faces |
|
|
|
def calculate_iou(self, box1, box2): |
|
box1_x1, box1_y1, box1_w, box1_h = box1 |
|
box2_x1, box2_y1, box2_w, box2_h = box2 |
|
box1_x2, box1_y2 = box1_x1 + box1_w, box1_y1 + box1_h |
|
box2_x2, box2_y2 = box2_x1 + box2_w, box2_y1 + box2_h |
|
x_left = max(box1_x1, box2_x1) |
|
y_top = max(box1_y1, box2_y1) |
|
x_right = min(box1_x2, box2_x2) |
|
y_bottom = min(box1_y2, box2_y2) |
|
if x_right < x_left or y_bottom < y_top: |
|
return 0.0 |
|
intersection_area = (x_right - x_left) * (y_bottom - y_top) |
|
box1_area = box1_w * box1_h |
|
box2_area = box2_w * box2_h |
|
return intersection_area / float(box1_area + box2_area - intersection_area) |
|
|
|
def track_faces(self, faces): |
|
matched_faces = [] |
|
unmatched_detections = list(range(len(faces))) |
|
if not self.face_history: |
|
for face in faces: |
|
face_id = self.next_face_id |
|
self.next_face_id += 1 |
|
self.face_history[face_id] = { |
|
'positions': deque([face[:4]], maxlen=self.face_history_max_size), |
|
'ttl': self.face_ttl, |
|
'label': None, |
|
'confidence': 0.0, |
|
'result_history': deque(maxlen=self.result_buffer_size) |
|
} |
|
matched_faces.append((face_id, face)) |
|
return matched_faces |
|
|
|
for face_id in list(self.face_history.keys()): |
|
last_pos = self.face_history[face_id]['positions'][-1] |
|
best_match = -1 |
|
best_iou = 0.3 |
|
for i in unmatched_detections: |
|
iou = self.calculate_iou(last_pos, faces[i][:4]) |
|
if iou > best_iou: |
|
best_iou = iou |
|
best_match = i |
|
if best_match != -1: |
|
matched_face = faces[best_match] |
|
self.face_history[face_id]['positions'].append(matched_face[:4]) |
|
self.face_history[face_id]['ttl'] = self.face_ttl |
|
matched_faces.append((face_id, matched_face)) |
|
unmatched_detections.remove(best_match) |
|
else: |
|
self.face_history[face_id]['ttl'] -= 1 |
|
if self.face_history[face_id]['ttl'] <= 0: |
|
del self.face_history[face_id] |
|
else: |
|
predicted_face = (*last_pos, 0.5) |
|
matched_faces.append((face_id, predicted_face)) |
|
|
|
for i in unmatched_detections: |
|
face_id = self.next_face_id |
|
self.next_face_id += 1 |
|
self.face_history[face_id] = { |
|
'positions': deque([faces[i][:4]], maxlen=self.face_history_max_size), |
|
'ttl': self.face_ttl, |
|
'label': None, |
|
'confidence': 0.0, |
|
'result_history': deque(maxlen=self.result_buffer_size) |
|
} |
|
matched_faces.append((face_id, faces[i])) |
|
return matched_faces |
|
|
|
def smooth_face_position(self, face_id): |
|
positions = self.face_history[face_id]['positions'] |
|
if len(positions) == 1: |
|
return positions[0] |
|
total_weight = 0 |
|
x, y, w, h = 0, 0, 0, 0 |
|
for i, pos in enumerate(positions): |
|
weight = 2 ** i |
|
total_weight += weight |
|
x += pos[0] * weight |
|
y += pos[1] * weight |
|
w += pos[2] * weight |
|
h += pos[3] * weight |
|
return (int(x / total_weight), int(y / total_weight), int(w / total_weight), int(h / total_weight)) |
|
|
|
def update_face_classification(self, face_id, label, confidence): |
|
self.face_history[face_id]['result_history'].append((label, confidence)) |
|
real_votes = 0 |
|
fake_votes = 0 |
|
total_confidence = 0.0 |
|
for result_label, result_conf in self.face_history[face_id]['result_history']: |
|
if result_label == "Real": |
|
real_votes += 1 |
|
total_confidence += result_conf |
|
elif result_label == "Fake": |
|
fake_votes += 1 |
|
total_confidence += result_conf |
|
if real_votes >= fake_votes: |
|
smoothed_label = "Real" |
|
label_confidence = real_votes / len(self.face_history[face_id]['result_history']) |
|
else: |
|
smoothed_label = "Fake" |
|
label_confidence = fake_votes / len(self.face_history[face_id]['result_history']) |
|
avg_confidence = (total_confidence / len(self.face_history[face_id]['result_history'])) * label_confidence |
|
self.face_history[face_id]['label'] = smoothed_label |
|
self.face_history[face_id]['confidence'] = avg_confidence |
|
return smoothed_label, avg_confidence |
|
|
|
def process_video(self, video_path, stframe, status_text, progress_bar, detector_type="dnn", model_type="vit"): |
|
use_dnn_current = detector_type == "dnn" and self.use_dnn |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
st.error(f"Error: Cannot open video source") |
|
return |
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
total_frames = 250 if video_path != 0 else 0 |
|
if video_path != 0: |
|
status_text.text(f"Video Info: {frame_width}x{frame_height}, {fps:.1f} FPS, {total_frames} frames") |
|
else: |
|
status_text.text(f"Webcam: {frame_width}x{frame_height}") |
|
self.face_history = {} |
|
self.next_face_id = 0 |
|
self.processing_times = deque(maxlen=30) |
|
frame_count = 0 |
|
process_every_n_frames = 2 |
|
face_stats = {"Real": 0, "Fake": 0, "Unknown": 0} |
|
|
|
while True: |
|
start_time = time.time() |
|
ret, frame = cap.read() |
|
if not ret: |
|
status_text.text("End of video reached") |
|
break |
|
frame_count += 1 |
|
if frame_count == 250: |
|
st.success("Video Processed Successfully!") |
|
break |
|
if video_path != 0: |
|
progress = min(float(frame_count) / float(max(total_frames, 1)), 1.0) |
|
progress_bar.progress(progress) |
|
process_frame = (frame_count % process_every_n_frames == 0) |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
if process_frame: |
|
faces = self.detect_faces_dnn(frame) if use_dnn_current else self.detect_faces_haar(frame) |
|
tracked_faces = self.track_faces(faces) |
|
face_images = [] |
|
face_ids = [] |
|
for face_id, (x, y, w, h, face_confidence) in tracked_faces: |
|
if face_id in self.face_history and w > 20 and h > 20: |
|
sx, sy, sw, sh = self.smooth_face_position(face_id) |
|
face = frame_rgb[sy:sy+sh, sx:sx+sw] |
|
if face.size > 0 and face.shape[0] >= 20 and face.shape[1] >= 20: |
|
face_images.append(face) |
|
face_ids.append(face_id) |
|
if face_images: |
|
if model_type == "vit": |
|
inputs = self.image_processor(images=face_images, return_tensors="pt") |
|
with torch.no_grad(): |
|
outputs = self.model(**inputs) |
|
logits = outputs.logits |
|
probs = torch.nn.functional.softmax(logits, dim=1) |
|
preds = torch.argmax(logits, dim=1) |
|
for i, pred in enumerate(preds): |
|
label = 'Real' if pred.item() == 1 else 'Fake' |
|
confidence = probs[i][pred].item() |
|
self.update_face_classification(face_ids[i], label, confidence) |
|
elif model_type == "cnn" and self.cnn_detector.model is not None: |
|
img_arrays = [cv2.resize(face, (128, 128)) / 255.0 for face in face_images] |
|
img_batch = np.array(img_arrays) |
|
predictions = self.cnn_detector.model.predict(img_batch) |
|
for i, prediction in enumerate(predictions): |
|
confidence = float(prediction[0]) |
|
label = 'Real' if confidence < 0.5 else 'Fake' |
|
if label == 'Fake': |
|
confidence = confidence |
|
else: |
|
confidence = 1.0 - confidence |
|
self.update_face_classification(face_ids[i], label, confidence) |
|
|
|
for face_id in self.face_history: |
|
if self.face_history[face_id]['ttl'] > 0: |
|
sx, sy, sw, sh = self.smooth_face_position(face_id) |
|
cv2.rectangle(frame, (sx, sy), (sx+sw, sy+sh), (0, 255, 255), 2) |
|
label = self.face_history[face_id]['label'] or "Unknown" |
|
confidence = self.face_history[face_id]['confidence'] |
|
result_text = f"{label}: {confidence:.2f}" |
|
text_color = (0, 255, 0) if label == "Real" else (0, 0, 255) |
|
cv2.rectangle(frame, (sx, sy+sh), (sx+len(result_text)*11, sy+sh+25), (0, 0, 0), -1) |
|
cv2.putText(frame, result_text, (sx, sy+sh+20), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, text_color, 2) |
|
cv2.putText(frame, f"ID:{face_id}", (sx, sy-5), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1) |
|
if label in face_stats: |
|
face_stats[label] += 1 |
|
|
|
process_time = time.time() - start_time |
|
self.processing_times.append(process_time) |
|
avg_time = sum(self.processing_times) / len(self.processing_times) |
|
effective_fps = 1.0 / avg_time if avg_time > 0 else 0 |
|
|
|
if video_path != 0: |
|
progress_percent = (frame_count / total_frames) * 100 if total_frames > 0 else 0 |
|
cv2.putText(frame, f"Frame: {frame_count}/{total_frames} ({progress_percent:.1f}%)", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
|
else: |
|
cv2.putText(frame, f"Frame: {frame_count}", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
|
detector_name = "DNN" if use_dnn_current else "Haar Cascade" |
|
model_name = "ViT" if model_type == "vit" else "CNN" |
|
cv2.putText(frame, f"Detector: {detector_name} | Model: {model_name} | FPS: {effective_fps:.1f}", |
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
cv2.putText(frame, f"Tracked faces: {len(self.face_history)}", |
|
(10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) |
|
stframe.image(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), channels="RGB") |
|
status_text.text(f"Real: {face_stats['Real']} | Fake: {face_stats['Fake']} | FPS: {effective_fps:.1f}") |
|
if st.session_state.get('stop_button', False): |
|
break |
|
cap.release() |
|
return face_stats |
|
|
|
def ensure_sample_video(): |
|
sample_dir = "sample_videos" |
|
sample_path = os.path.join(sample_dir, "Sample.mp4") |
|
if not os.path.exists(sample_dir): |
|
os.makedirs(sample_dir) |
|
if not os.path.exists(sample_path): |
|
try: |
|
with st.spinner("Downloading sample video..."): |
|
sample_url = "https://storage.googleapis.com/deepfake-demo/sample_deepfake.mp4" |
|
with urllib.request.urlopen(sample_url) as response, open(sample_path, 'wb') as out_file: |
|
shutil.copyfileobj(response, out_file) |
|
st.success("Sample video downloaded successfully!") |
|
except Exception as e: |
|
st.error(f"Failed to download sample video: {e}") |
|
return None |
|
return sample_path |
|
|
|
def main(): |
|
st.set_page_config(page_title="Deepfake Detector", layout="wide") |
|
st.title("Deepfake Detection App") |
|
st.markdown(""" |
|
This app uses computer vision and deep learning to detect deepfake videos. |
|
Upload a video or use your webcam to detect if faces are real or manipulated. |
|
""") |
|
|
|
if 'detector' not in st.session_state: |
|
st.session_state.detector = None |
|
if 'stop_button' not in st.session_state: |
|
st.session_state.stop_button = False |
|
if 'use_sample' not in st.session_state: |
|
st.session_state.use_sample = False |
|
if 'sample_path' not in st.session_state: |
|
st.session_state.sample_path = None |
|
|
|
if st.session_state.detector is None: |
|
st.session_state.detector = DeepfakeDetector() |
|
|
|
st.sidebar.title("Options") |
|
input_option = st.sidebar.radio("Select Input Source", ["Upload Video", "Use Webcam", "Try Sample Video"]) |
|
detector_type = st.sidebar.selectbox("Face Detector", ["DNN (better for close-ups)", "Haar Cascade (faster)"], |
|
index=0 if st.session_state.detector.use_dnn else 1) |
|
detector_option = "dnn" if "DNN" in detector_type else "haar" |
|
model_type = st.sidebar.selectbox("Deepfake Detection Model", ["Vision Transformer (ViT)", "F3 Net Model"], index=0) |
|
model_option = "vit" if "Vision" in model_type else "cnn" |
|
|
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
video_placeholder = st.empty() |
|
with col2: |
|
status_text = st.empty() |
|
progress_bar = st.empty() |
|
st.subheader("Results") |
|
results_area = st.empty() |
|
if st.button("Stop Processing"): |
|
st.session_state.stop_button = True |
|
|
|
if input_option == "Upload Video": |
|
uploaded_file = st.sidebar.file_uploader("Choose a video file", type=["mp4", "avi", "mov", "mkv"]) |
|
if uploaded_file is not None: |
|
st.session_state.stop_button = False |
|
tfile = tempfile.NamedTemporaryFile(delete=False) |
|
tfile.write(uploaded_file.read()) |
|
video_path = tfile.name |
|
face_stats = st.session_state.detector.process_video(video_path, video_placeholder, status_text, |
|
progress_bar, detector_option, model_option) |
|
results_df = {"Category": ["Real Faces", "Fake Faces"], "Count": [face_stats["Real"], face_stats["Fake"]]} |
|
results_area.dataframe(results_df) |
|
os.unlink(video_path) |
|
elif input_option == "Use Webcam": |
|
st.session_state.stop_button = False |
|
if st.sidebar.button("Start Webcam"): |
|
face_stats = st.session_state.detector.process_video(0, video_placeholder, status_text, progress_bar, |
|
detector_option, model_option) |
|
results_df = {"Category": ["Real Faces", "Fake Faces"], "Count": [face_stats["Real"], face_stats["Fake"]]} |
|
results_area.dataframe(results_df) |
|
elif input_option == "Try Sample Video": |
|
st.session_state.stop_button = False |
|
sample_path = ensure_sample_video() |
|
if sample_path and st.sidebar.button("Process Sample Video"): |
|
face_stats = st.session_state.detector.process_video(sample_path, video_placeholder, status_text, |
|
progress_bar, detector_option, model_option) |
|
results_df = {"Category": ["Real Faces", "Fake Faces"], "Count": [face_stats["Real"], face_stats["Fake"]]} |
|
results_area.dataframe(results_df) |
|
|
|
if __name__ == "__main__": |
|
main() |