deepdefend-api / analysis /video_analyser.py
nishchandel's picture
Initial deployment without models
60efa5a
import cv2
import torch
import numpy as np
from PIL import Image
from typing import List, Dict
from collections import Counter
from models.load_models import model_loader
class VideoAnalyzer:
"""Simple, reliable video analyzer for hackathon demo"""
def __init__(self):
self.model, self.processor = model_loader.load_video_model()
self.device = model_loader.get_device()
self.face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
def detect_face(self, frame: np.ndarray) -> Dict:
"""Detect face in frame"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
if len(faces) > 0:
x, y, w, h = max(faces, key=lambda f: f[2] * f[3])
face_crop = frame[y:y+h, x:x+w]
return {
'detected': True,
'bbox': {'x': int(x), 'y': int(y), 'w': int(w), 'h': int(h)},
'face_crop': face_crop
}
return {'detected': False, 'bbox': None, 'face_crop': None}
def predict_deepfake(self, frame: np.ndarray) -> Dict:
"""Predict if frame is deepfake"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(frame_rgb)
inputs = self.processor(images=pil_img, return_tensors="pt")
if self.device == "cuda":
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.nn.functional.softmax(logits, dim=-1)
fake_prob = probs[0][1].item() if probs.shape[1] > 1 else probs[0][0].item()
confidence = max(probs[0]).item()
return {
'fake_score': round(fake_prob, 3),
'confidence': round(confidence, 3),
'label': 'fake' if fake_prob > 0.5 else 'real'
}
def detect_suspicious_regions(self, face: np.ndarray, fake_score: float) -> List[str]:
try:
gray = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
h, w = gray.shape
suspicious_regions = []
regions = {
'eyes': (int(h*0.25), int(h*0.45), int(w*0.15), int(w*0.85)),
'nose': (int(h*0.40), int(h*0.65), int(w*0.35), int(w*0.65)),
'mouth': (int(h*0.60), int(h*0.80), int(w*0.30), int(w*0.70)),
'forehead': (int(h*0.08), int(h*0.28), int(w*0.25), int(w*0.75)),
'cheeks': (int(h*0.45), int(h*0.70), int(w*0.15), int(w*0.85)),
'chin': (int(h*0.75), int(h*0.95), int(w*0.30), int(w*0.70))
}
for region_name, (y1, y2, x1, x2) in regions.items():
region = gray[y1:y2, x1:x2]
if region.size == 0:
continue
suspicious = False
variance = np.var(region)
if variance < 200 or variance > 8000:
suspicious = True
edges = cv2.Canny(region, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
if edge_density < 0.05:
suspicious = True
if fake_score > 0.7 and variance < 400:
suspicious = True
if suspicious:
suspicious_regions.append(region_name)
left_half = gray[:, :w//2]
right_half = np.fliplr(gray[:, w//2:])
min_width = min(left_half.shape[1], right_half.shape[1])
left_half = left_half[:, :min_width]
right_half = right_half[:, :min_width]
symmetry_diff = np.mean(np.abs(left_half.astype(float) - right_half.astype(float)))
if symmetry_diff < 10:
suspicious_regions.append('unnatural_symmetry')
return suspicious_regions if suspicious_regions else ['none']
except Exception as e:
print(f"Region detection error: {e}")
return ['analysis_error']
def analyze_interval(self, interval_data: Dict) -> Dict:
"""Analyze all frames in an interval"""
frames_data = interval_data['video_data']
if not frames_data:
return {
'interval_id': interval_data['interval_id'],
'interval': interval_data['interval'],
'fake_score': 0.0,
'confidence': 0.0,
'suspicious_regions': [],
'face_detected': False,
'frame_results': []
}
frame_results = []
total_fake_score = 0
faces_detected = 0
all_regions = []
for frame_data in frames_data:
frame = frame_data['frame']
timestamp = frame_data['timestamp']
face_info = self.detect_face(frame)
if face_info['detected']:
faces_detected += 1
pred = self.predict_deepfake(face_info['face_crop'])
regions = self.detect_suspicious_regions(face_info['face_crop'], pred['fake_score'])
else:
pred = self.predict_deepfake(frame)
regions = ['no_face_detected']
total_fake_score += pred['fake_score']
all_regions.extend(regions)
frame_results.append({
'timestamp': timestamp,
'fake_score': pred['fake_score'],
'confidence': pred['confidence'],
'face_detected': face_info['detected'],
'regions': regions
})
avg_fake_score = total_fake_score / len(frames_data)
region_counts = Counter(all_regions)
threshold = len(frames_data) * 0.5
consistent_regions = [
region for region, count in region_counts.items()
if count >= threshold and region not in ['none', 'no_face_detected', 'analysis_error']
]
return {
'interval_id': interval_data['interval_id'],
'interval': interval_data['interval'],
'fake_score': round(avg_fake_score, 3),
'confidence': round(np.mean([f['confidence'] for f in frame_results]), 3),
'suspicious_regions': consistent_regions if consistent_regions else list(set(all_regions)),
'face_detected': faces_detected > 0,
'frame_count': len(frames_data),
'frames_with_faces': faces_detected,
'frame_results': frame_results
}