credify / app /api /forgery_routes.py
abhisheksan's picture
Refactor face manipulation detection in forgery_routes.py and improve error handling in forgery_image_utils.py
be986b5
from fastapi import APIRouter, HTTPException
from app.services.image_manipulation_service import ImageManipulationService
from app.services.face_manipulation_service import FaceManipulationService
from app.services.audio_deepfake_service import AudioDeepfakeService
from app.services.gan_detection_service import GANDetectionService
from app.utils.file_utils import download_file, remove_temp_file, get_file_content
from app.utils.forgery_image_utils import detect_face
from app.utils.forgery_video_utils import extract_audio, extract_frames, compress_and_process_video, detect_speech # Adjust the import path if necessary
from app.services.deepfake_video_detection import DeepfakeVideoDetectionService
import os
import numpy as np
import logging
import traceback
from pydantic import BaseModel
router = APIRouter()
class DetectForgeryRequest(BaseModel):
file_url: str
# Initialize services
image_manipulation_service = ImageManipulationService()
face_manipulation_service = FaceManipulationService()
audio_deepfake_service = AudioDeepfakeService()
gan_detection_service = GANDetectionService()
deepfake_video_detection_service = DeepfakeVideoDetectionService()
def parse_confidence(value):
if isinstance(value, str):
return float(value.rstrip('%')) / 100
return float(value)
def get_file_extension(url: str) -> str:
_, ext = os.path.splitext(url)
return ext.lstrip('.').lower()
@router.post("/detect_forgery")
async def detect_forgery(request: DetectForgeryRequest):
file_url = request.file_url
logging.info(f"Received forgery detection request for file: {file_url}")
firebase_filename = None
try:
file_extension = get_file_extension(file_url)
logging.info(f"Detected file extension: {file_extension}")
firebase_filename = await download_file(file_url)
logging.info(f"File downloaded and saved as: {firebase_filename}")
if file_extension in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'gif', 'tiff', 'webp']:
logging.info(f"Processing image file: {firebase_filename}")
return await process_image(firebase_filename)
elif file_extension in ['mp4', 'avi', 'mov', 'flv', 'wmv']:
logging.info(f"Processing video file: {firebase_filename}")
return await process_video(firebase_filename)
else:
logging.error(f"Unsupported file type: {file_extension} (URL: {file_url})")
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}")
except Exception as e:
logging.error(f"Error processing file: {e}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail="An error occurred while processing the file.")
finally:
if firebase_filename:
logging.info(f"Removing temporary file: {firebase_filename}")
await remove_temp_file(firebase_filename)
async def process_image(firebase_filename: str):
logging.info(f"Starting image processing for: {firebase_filename}")
image_content = get_file_content(firebase_filename)
has_face = detect_face(image_content)
logging.info(f"Face detection result for {firebase_filename}: {'Face detected' if has_face else 'No face detected'}")
results = {
"image_manipulation": image_manipulation_service.detect_manipulation(firebase_filename),
"gan_detection": gan_detection_service.detect_gan(firebase_filename)
}
logging.info(f"Image manipulation detection result: {results['image_manipulation']}")
logging.info(f"GAN detection result: {results['gan_detection']}")
if has_face:
results["face_manipulation"] = face_manipulation_service.detect_manipulation(firebase_filename)
logging.info(f"Face manipulation detection result: {results['face_manipulation']}")
else:
results["face_manipulation"] = {
"is_manipulated": False,
"confidence": "0%"
}
logging.info("Face manipulation detection skipped (no face detected)")
logging.info(f"Image processing completed for: {firebase_filename}")
return results
def convert_to_python_types(obj):
if isinstance(obj, np.generic):
return obj.item()
elif isinstance(obj, (list, tuple)):
return [convert_to_python_types(item) for item in obj]
elif isinstance(obj, dict):
return {key: convert_to_python_types(value) for key, value in obj.items()}
elif isinstance(obj, np.ndarray):
return obj.tolist()
return obj
async def process_video(firebase_filename: str):
logging.info(f"Starting video processing for: {firebase_filename}")
try:
compressed_video_filename = await compress_and_process_video(firebase_filename)
logging.info(f"Video compressed: {compressed_video_filename}")
audio_filename = await extract_audio(compressed_video_filename)
is_audio_deepfake = False
if audio_filename:
logging.info(f"Audio extracted successfully: {audio_filename}")
audio_content = get_file_content(audio_filename)
if detect_speech(audio_content):
logging.info("Speech detected in the audio")
# Audio deepfake detection logic here if needed
else:
logging.info("No speech detected in the audio")
await remove_temp_file(audio_filename)
logging.info(f"Temporary audio file removed: {audio_filename}")
else:
logging.info("No audio detected or extracted from the video")
results = {"is_audio_deepfake": is_audio_deepfake}
frames = await extract_frames(compressed_video_filename)
logging.info(f"Frames extracted: {len(frames)} frames")
results.update({
"image_manipulation": {
"collective_detection": False,
"collective_confidence": 0.0
},
"face_manipulation": {
"collective_detection": False,
"collective_confidence": 0.0
},
"gan_detection": {
"collective_detection": False,
"collective_confidence": 0.0
}
})
face_frames = []
img_manip_detections = []
img_manip_confidences = []
gan_detections = []
gan_confidences = []
for frame in frames:
frame_content = get_file_content(frame)
has_face = detect_face(frame_content)
if has_face:
face_frames.append(frame)
img_manip_result = image_manipulation_service.detect_manipulation(frame)
gan_result = gan_detection_service.detect_gan(frame)
img_manip_detections.append(img_manip_result.get("is_manipulated", False))
img_manip_confidences.append(parse_confidence(img_manip_result.get("confidence", "0%")))
gan_detections.append(gan_result.get("is_gan", False))
gan_confidences.append(parse_confidence(gan_result.get("confidence", "0%")))
# Aggregate results for image manipulation and GAN detection
results["image_manipulation"]["collective_detection"] = any(img_manip_detections)
results["image_manipulation"]["collective_confidence"] = sum(img_manip_confidences) / len(img_manip_confidences) if img_manip_confidences else 0.0
results["gan_detection"]["collective_detection"] = any(gan_detections)
results["gan_detection"]["collective_confidence"] = sum(gan_confidences) / len(gan_confidences) if gan_confidences else 0.0
# Perform deepfake detection if faces were detected
if face_frames:
deepfake_result = deepfake_video_detection_service.detect_deepfake(face_frames)
deepfake_result = convert_to_python_types(deepfake_result)
results["face_manipulation"] = {
"collective_detection": bool(deepfake_result["is_deepfake"]),
"collective_confidence": deepfake_result['confidence']
}
logging.info(f"Aggregated results: {results}")
await remove_temp_file(compressed_video_filename)
for frame in frames:
await remove_temp_file(frame)
logging.info(f"Temporary files removed")
logging.info(f"Video processing completed for: {firebase_filename}")
return results
except Exception as e:
logging.error(f"Error processing video: {e}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"An error occurred while processing the video: {str(e)}")