Spaces:
Running
Running
Refactor face manipulation detection in forgery_routes.py and improve error handling in forgery_image_utils.py
be986b5
from fastapi import APIRouter, HTTPException | |
from app.services.image_manipulation_service import ImageManipulationService | |
from app.services.face_manipulation_service import FaceManipulationService | |
from app.services.audio_deepfake_service import AudioDeepfakeService | |
from app.services.gan_detection_service import GANDetectionService | |
from app.utils.file_utils import download_file, remove_temp_file, get_file_content | |
from app.utils.forgery_image_utils import detect_face | |
from app.utils.forgery_video_utils import extract_audio, extract_frames, compress_and_process_video, detect_speech # Adjust the import path if necessary | |
from app.services.deepfake_video_detection import DeepfakeVideoDetectionService | |
import os | |
import numpy as np | |
import logging | |
import traceback | |
from pydantic import BaseModel | |
router = APIRouter() | |
class DetectForgeryRequest(BaseModel): | |
file_url: str | |
# Initialize services | |
image_manipulation_service = ImageManipulationService() | |
face_manipulation_service = FaceManipulationService() | |
audio_deepfake_service = AudioDeepfakeService() | |
gan_detection_service = GANDetectionService() | |
deepfake_video_detection_service = DeepfakeVideoDetectionService() | |
def parse_confidence(value): | |
if isinstance(value, str): | |
return float(value.rstrip('%')) / 100 | |
return float(value) | |
def get_file_extension(url: str) -> str: | |
_, ext = os.path.splitext(url) | |
return ext.lstrip('.').lower() | |
async def detect_forgery(request: DetectForgeryRequest): | |
file_url = request.file_url | |
logging.info(f"Received forgery detection request for file: {file_url}") | |
firebase_filename = None | |
try: | |
file_extension = get_file_extension(file_url) | |
logging.info(f"Detected file extension: {file_extension}") | |
firebase_filename = await download_file(file_url) | |
logging.info(f"File downloaded and saved as: {firebase_filename}") | |
if file_extension in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'gif', 'tiff', 'webp']: | |
logging.info(f"Processing image file: {firebase_filename}") | |
return await process_image(firebase_filename) | |
elif file_extension in ['mp4', 'avi', 'mov', 'flv', 'wmv']: | |
logging.info(f"Processing video file: {firebase_filename}") | |
return await process_video(firebase_filename) | |
else: | |
logging.error(f"Unsupported file type: {file_extension} (URL: {file_url})") | |
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") | |
except Exception as e: | |
logging.error(f"Error processing file: {e}") | |
logging.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail="An error occurred while processing the file.") | |
finally: | |
if firebase_filename: | |
logging.info(f"Removing temporary file: {firebase_filename}") | |
await remove_temp_file(firebase_filename) | |
async def process_image(firebase_filename: str): | |
logging.info(f"Starting image processing for: {firebase_filename}") | |
image_content = get_file_content(firebase_filename) | |
has_face = detect_face(image_content) | |
logging.info(f"Face detection result for {firebase_filename}: {'Face detected' if has_face else 'No face detected'}") | |
results = { | |
"image_manipulation": image_manipulation_service.detect_manipulation(firebase_filename), | |
"gan_detection": gan_detection_service.detect_gan(firebase_filename) | |
} | |
logging.info(f"Image manipulation detection result: {results['image_manipulation']}") | |
logging.info(f"GAN detection result: {results['gan_detection']}") | |
if has_face: | |
results["face_manipulation"] = face_manipulation_service.detect_manipulation(firebase_filename) | |
logging.info(f"Face manipulation detection result: {results['face_manipulation']}") | |
else: | |
results["face_manipulation"] = { | |
"is_manipulated": False, | |
"confidence": "0%" | |
} | |
logging.info("Face manipulation detection skipped (no face detected)") | |
logging.info(f"Image processing completed for: {firebase_filename}") | |
return results | |
def convert_to_python_types(obj): | |
if isinstance(obj, np.generic): | |
return obj.item() | |
elif isinstance(obj, (list, tuple)): | |
return [convert_to_python_types(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {key: convert_to_python_types(value) for key, value in obj.items()} | |
elif isinstance(obj, np.ndarray): | |
return obj.tolist() | |
return obj | |
async def process_video(firebase_filename: str): | |
logging.info(f"Starting video processing for: {firebase_filename}") | |
try: | |
compressed_video_filename = await compress_and_process_video(firebase_filename) | |
logging.info(f"Video compressed: {compressed_video_filename}") | |
audio_filename = await extract_audio(compressed_video_filename) | |
is_audio_deepfake = False | |
if audio_filename: | |
logging.info(f"Audio extracted successfully: {audio_filename}") | |
audio_content = get_file_content(audio_filename) | |
if detect_speech(audio_content): | |
logging.info("Speech detected in the audio") | |
# Audio deepfake detection logic here if needed | |
else: | |
logging.info("No speech detected in the audio") | |
await remove_temp_file(audio_filename) | |
logging.info(f"Temporary audio file removed: {audio_filename}") | |
else: | |
logging.info("No audio detected or extracted from the video") | |
results = {"is_audio_deepfake": is_audio_deepfake} | |
frames = await extract_frames(compressed_video_filename) | |
logging.info(f"Frames extracted: {len(frames)} frames") | |
results.update({ | |
"image_manipulation": { | |
"collective_detection": False, | |
"collective_confidence": 0.0 | |
}, | |
"face_manipulation": { | |
"collective_detection": False, | |
"collective_confidence": 0.0 | |
}, | |
"gan_detection": { | |
"collective_detection": False, | |
"collective_confidence": 0.0 | |
} | |
}) | |
face_frames = [] | |
img_manip_detections = [] | |
img_manip_confidences = [] | |
gan_detections = [] | |
gan_confidences = [] | |
for frame in frames: | |
frame_content = get_file_content(frame) | |
has_face = detect_face(frame_content) | |
if has_face: | |
face_frames.append(frame) | |
img_manip_result = image_manipulation_service.detect_manipulation(frame) | |
gan_result = gan_detection_service.detect_gan(frame) | |
img_manip_detections.append(img_manip_result.get("is_manipulated", False)) | |
img_manip_confidences.append(parse_confidence(img_manip_result.get("confidence", "0%"))) | |
gan_detections.append(gan_result.get("is_gan", False)) | |
gan_confidences.append(parse_confidence(gan_result.get("confidence", "0%"))) | |
# Aggregate results for image manipulation and GAN detection | |
results["image_manipulation"]["collective_detection"] = any(img_manip_detections) | |
results["image_manipulation"]["collective_confidence"] = sum(img_manip_confidences) / len(img_manip_confidences) if img_manip_confidences else 0.0 | |
results["gan_detection"]["collective_detection"] = any(gan_detections) | |
results["gan_detection"]["collective_confidence"] = sum(gan_confidences) / len(gan_confidences) if gan_confidences else 0.0 | |
# Perform deepfake detection if faces were detected | |
if face_frames: | |
deepfake_result = deepfake_video_detection_service.detect_deepfake(face_frames) | |
deepfake_result = convert_to_python_types(deepfake_result) | |
results["face_manipulation"] = { | |
"collective_detection": bool(deepfake_result["is_deepfake"]), | |
"collective_confidence": deepfake_result['confidence'] | |
} | |
logging.info(f"Aggregated results: {results}") | |
await remove_temp_file(compressed_video_filename) | |
for frame in frames: | |
await remove_temp_file(frame) | |
logging.info(f"Temporary files removed") | |
logging.info(f"Video processing completed for: {firebase_filename}") | |
return results | |
except Exception as e: | |
logging.error(f"Error processing video: {e}") | |
logging.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"An error occurred while processing the video: {str(e)}") |