deepdetection / src /api /main.py
akagtag's picture
feat: add metadata extraction and synthetic keyword detection for images and videos
7250f83
from __future__ import annotations
import asyncio
import io
import logging
import os
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from PIL import ExifTags, Image
from src.engines.coherence.engine import CoherenceEngine
from src.engines.fingerprint.engine import FingerprintEngine
from src.engines.sstgnn.engine import SSTGNNEngine
from src.explainability.explainer import MODEL_CANDIDATES, explain
from src.fusion.fuser import fuse
from src.services.hf_inference_client import HFInferenceClient, HFInferenceUnavailable
from src.services.inference_router import (
get_inference_backend,
is_runpod_configured,
route_inference,
)
from src.services.media_utils import extract_video_frames
from src.types import DetectionResponse, EngineResult
logger = logging.getLogger(__name__)
# Load local development environment values from .env when present.
load_dotenv()
def _is_test_mode() -> bool:
return (
os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower()
in {"1", "true", "yes", "on"}
or "PYTEST_CURRENT_TEST" in os.environ
or "pytest" in sys.modules
)
if _is_test_mode():
os.environ.setdefault("GENAI_SKIP_MODEL_LOAD", "1")
app = FastAPI(title="GenAI-DeepDetect", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
_fp = FingerprintEngine()
_co = CoherenceEngine()
_st = SSTGNNEngine()
_hf = HFInferenceClient()
MAX_IMAGE_MB = int(os.environ.get("MAX_IMAGE_SIZE_MB", 20))
MAX_VIDEO_MB = int(os.environ.get("MAX_VIDEO_SIZE_MB", 100))
MAX_FRAMES = int(os.environ.get("MAX_VIDEO_FRAMES", 300))
IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/bmp", "image/gif"}
VIDEO_TYPES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm", "video/avi"}
SUPPORTED_GENERATORS = [
"real",
"unknown_gan",
"stable_diffusion",
"midjourney",
"dall_e",
"flux",
"firefly",
"imagen",
]
SYNTHETIC_KEYWORDS = (
"chatgpt",
"gemini",
"thispersondoesnotexist",
"this person does not exist",
)
def _find_synthetic_keyword_hits(*texts: str) -> list[str]:
haystack = " ".join(texts).lower()
hits: list[str] = []
for keyword in SYNTHETIC_KEYWORDS:
if keyword in haystack:
hits.append(keyword)
# Preserve order while deduping.
return list(dict.fromkeys(hits))
def _collect_image_metadata_text(data: bytes) -> str:
try:
image = Image.open(io.BytesIO(data))
except Exception:
return ""
parts: list[str] = []
try:
for key, value in image.info.items():
if isinstance(value, bytes):
parts.append(f"{key}={value[:200]!r}")
else:
parts.append(f"{key}={value}")
except Exception:
pass
try:
exif = image.getexif()
for tag_id, value in exif.items():
tag_name = ExifTags.TAGS.get(tag_id, str(tag_id))
parts.append(f"{tag_name}={value}")
except Exception:
pass
return " | ".join(parts)
def _collect_video_metadata_text(
data: bytes,
*,
content_type: str | None,
filename: str | None,
) -> str:
suffix = _video_temp_suffix(content_type, filename)
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(data)
tmp_path = tmp.name
try:
cmd = [
"ffprobe",
"-v",
"error",
"-print_format",
"json",
"-show_format",
"-show_streams",
tmp_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15, check=False)
if result.returncode != 0:
return ""
return result.stdout or ""
except Exception:
return ""
finally:
Path(tmp_path).unlink(missing_ok=True)
def _apply_metadata_keyword_signal(
response: DetectionResponse,
*,
filename: str | None,
metadata_text: str,
) -> DetectionResponse:
hits = _find_synthetic_keyword_hits(filename or "", metadata_text)
if not hits:
return response
flagged = response.model_copy(deep=True)
flagged.engine_breakdown.append(
EngineResult(
engine="metadata_signal",
verdict="FAKE",
confidence=0.98,
attributed_generator="unknown_gan",
explanation=f"Filename/metadata contains synthetic keyword(s): {', '.join(hits)}.",
processing_time_ms=0.0,
)
)
flagged.explanation = (
f"{flagged.explanation} "
f"Metadata signal detected keyword(s): {', '.join(hits)}."
)
if flagged.verdict != "FAKE" or flagged.confidence < 0.85:
flagged.verdict = "FAKE"
flagged.confidence = max(flagged.confidence, 0.85)
if flagged.attributed_generator == "real":
flagged.attributed_generator = "unknown_gan"
return flagged
def _video_temp_suffix(content_type: str | None, filename: str | None) -> str:
"""Choose a temp suffix matching the uploaded container for better decoder compatibility."""
by_type = {
"video/mp4": ".mp4",
"video/quicktime": ".mov",
"video/x-msvideo": ".avi",
"video/webm": ".webm",
"video/avi": ".avi",
}
ctype = (content_type or "").split(";")[0].strip().lower()
if ctype in by_type:
return by_type[ctype]
ext = Path(filename or "").suffix.strip().lower()
if ext in {".mp4", ".mov", ".avi", ".webm"}:
return ext
return ".mp4"
def _model_inventory() -> dict[str, object]:
return {
"fingerprint": {
"ensemble_detectors": [
"Organika/sdxl-detector",
"haywoodsloan/ai-image-detector-deploy",
"dima806/deepfake_vs_real_image_detection",
],
"ensemble_weights": [0.5, 0.3, 0.2],
"attribution_model": "openai/clip-vit-large-patch14",
},
"coherence": {
"audio_deepfake_model": "disabled (visual-only coherence)",
"facial_landmarks": "mediapipe FaceMesh/FaceLandmarker",
"temporal_embedding": "facenet-pytorch InceptionResnetV1(vggface2) when available",
},
"sstgnn": {
"pretrained_hf_models": [
"dima806/deepfake_vs_real_image_detection",
"prithivMLmods/Deep-Fake-Detector-Model",
],
"graph_component": "scipy.spatial.Delaunay + MediaPipe landmarks",
},
"explainability": {
"gemini_model_candidates": list(MODEL_CANDIDATES),
},
"generator_labels": SUPPORTED_GENERATORS,
}
@app.get("/", response_class=HTMLResponse)
async def root() -> HTMLResponse:
return HTMLResponse("<h1>GenAI-DeepDetect API</h1><p>See /docs</p>")
@app.on_event("startup")
async def preload() -> None:
if _is_test_mode():
logger.info("Skipping startup preload in test mode")
return
logger.info("Preloading models...")
# Keep model imports/loads sequential to avoid lazy-import race issues.
await asyncio.to_thread(_fp._ensure)
await asyncio.to_thread(_co._ensure)
await asyncio.to_thread(_st._ensure)
logger.info("Model preload complete")
@app.get("/health")
async def health() -> dict:
return {
"status": "ok",
"version": "1.0.0",
"engines": ["fingerprint", "coherence", "sstgnn"],
"inference_backend": get_inference_backend(),
"runpod_configured": is_runpod_configured(),
}
@app.get("/health/models")
async def health_models() -> dict[str, object]:
"""Return the pretrained model inventory used by each engine."""
return _model_inventory()
def _assign_processing_time(results: list[EngineResult], ms: float) -> list[EngineResult]:
for result in results:
result.processing_time_ms = round(ms, 2)
return results
def _fallback_explanation(verdict: str, confidence: float, generator: str) -> str:
return (
f"Content classified as {verdict} with {confidence:.0%} confidence. "
f"Attributed generator: {generator}."
)
def _hf_fake_score(preds: list[dict]) -> float:
if not preds:
return 0.5
fake_keywords = (
"fake",
"deepfake",
"generated",
"synthetic",
"artificial",
"ai",
"label_1",
"class_1",
"1",
)
real_keywords = ("real", "authentic", "human", "natural", "label_0", "class_0", "0")
fake_best = 0.0
real_best = 0.0
for pred in preds:
label = str(pred.get("label", "")).strip().lower()
score = float(pred.get("score", 0.0))
if any(keyword in label for keyword in fake_keywords):
fake_best = max(fake_best, score)
if any(keyword in label for keyword in real_keywords):
real_best = max(real_best, score)
if fake_best == 0.0 and real_best == 0.0:
top = preds[0] if preds else {}
top_label = str(top.get("label", "")).strip().lower()
top_score = float(top.get("score", 0.5))
if any(keyword in top_label for keyword in fake_keywords):
return float(np.clip(top_score, 0.0, 1.0))
if any(keyword in top_label for keyword in real_keywords):
return float(np.clip(1.0 - top_score, 0.0, 1.0))
return 0.5
if fake_best == 0.0:
return float(np.clip(1.0 - real_best, 0.0, 1.0))
return float(np.clip(fake_best, 0.0, 1.0))
def _hf_generator_label(preds: list[dict], verdict: str) -> str:
if verdict != "FAKE":
return "real"
labels = " ".join(str(pred.get("label", "")).lower() for pred in preds)
for candidate in SUPPORTED_GENERATORS:
if candidate == "real":
continue
if candidate.replace("_", " ") in labels or candidate in labels:
return candidate
return "unknown_gan"
def _build_hf_response(preds: list[dict], elapsed_ms: float, media_type: str) -> DetectionResponse:
fake_score = _hf_fake_score(preds)
verdict = "FAKE" if fake_score > 0.5 else "REAL"
confidence = fake_score if verdict == "FAKE" else (1.0 - fake_score)
generator = _hf_generator_label(preds, verdict)
top_label = str(preds[0].get("label", "unknown")) if preds else "unknown"
explanation = (
f"Hugging Face serverless ({media_type}) top label: {top_label}. "
f"Classified as {verdict} with {confidence:.0%} confidence."
)
engine_result = EngineResult(
engine="hf_serverless",
verdict=verdict,
confidence=confidence,
attributed_generator=generator,
explanation=explanation,
processing_time_ms=elapsed_ms,
)
return DetectionResponse(
verdict=verdict,
confidence=confidence,
attributed_generator=generator,
explanation=explanation,
processing_time_ms=elapsed_ms,
engine_breakdown=[engine_result],
)
async def _hf_detect_image(data: bytes) -> DetectionResponse:
t0 = time.monotonic()
preds = await _hf.classify_image(data, timeout=45.0)
elapsed_ms = (time.monotonic() - t0) * 1000
return _build_hf_response(preds, elapsed_ms, media_type="image")
async def _hf_detect_video(
data: bytes,
*,
content_type: str | None = None,
filename: str | None = None,
) -> DetectionResponse:
with tempfile.NamedTemporaryFile(
suffix=_video_temp_suffix(content_type, filename),
delete=False,
) as tmp:
tmp.write(data)
tmp_path = tmp.name
try:
try:
frames = await asyncio.to_thread(extract_video_frames, tmp_path, MAX_FRAMES)
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Video decode failed: {exc}") from exc
finally:
Path(tmp_path).unlink(missing_ok=True)
if not frames:
raise HTTPException(status_code=422, detail="Could not extract frames")
keyframe = Image.fromarray(frames[0])
buf = io.BytesIO()
keyframe.save(buf, format="JPEG")
return await _hf_detect_image(buf.getvalue())
async def _ensure_models_loaded() -> None:
if _is_test_mode():
return
await asyncio.to_thread(_fp._ensure)
await asyncio.to_thread(_co._ensure)
await asyncio.to_thread(_st._ensure)
@app.post("/detect/image", response_model=DetectionResponse)
async def detect_image(file: UploadFile = File(...)) -> DetectionResponse:
t0 = time.monotonic()
content_type = (file.content_type or "").split(";")[0].strip().lower()
if content_type not in IMAGE_TYPES:
raise HTTPException(status_code=415, detail=f"Unsupported type: {file.content_type}")
data = await file.read()
if len(data) > MAX_IMAGE_MB * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
metadata_text = _collect_image_metadata_text(data)
backend = get_inference_backend()
if backend == "hf" and not _is_test_mode():
try:
response = await _hf_detect_image(data)
return _apply_metadata_keyword_signal(
response,
filename=file.filename,
metadata_text=metadata_text,
)
except HFInferenceUnavailable as exc:
logger.warning("HF image route failed, trying RunPod fallback: %s", exc)
except Exception as exc:
logger.warning("HF image route unexpected error, trying RunPod fallback: %s", exc)
if is_runpod_configured():
try:
return await route_inference(data, "image")
except Exception as exc:
raise HTTPException(
status_code=503,
detail=f"Hugging Face and RunPod failed for image inference: {exc}",
) from exc
raise HTTPException(
status_code=503,
detail="Hugging Face inference failed and RunPod is not configured.",
)
if (
backend == "runpod"
and not _is_test_mode()
and is_runpod_configured()
):
try:
return await route_inference(data, "image")
except Exception as exc:
logger.warning("RunPod image route failed, falling back to local image inference: %s", exc)
try:
image = Image.open(io.BytesIO(data)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Could not decode image: {exc}") from exc
await _ensure_models_loaded()
fp, co, st = await asyncio.gather(
asyncio.to_thread(_fp.run, image),
asyncio.to_thread(_co.run, image),
asyncio.to_thread(_st.run, image),
)
elapsed_ms = (time.monotonic() - t0) * 1000
engine_results = _assign_processing_time([fp, co, st], elapsed_ms)
verdict, conf, generator = fuse(engine_results, is_video=False)
if _is_test_mode():
explanation = _fallback_explanation(verdict, conf, generator)
else:
explanation = await asyncio.to_thread(explain, verdict, conf, engine_results, generator)
response = DetectionResponse(
verdict=verdict,
confidence=conf,
attributed_generator=generator,
explanation=explanation,
processing_time_ms=elapsed_ms,
engine_breakdown=engine_results,
)
return _apply_metadata_keyword_signal(
response,
filename=file.filename,
metadata_text=metadata_text,
)
@app.post("/detect/video", response_model=DetectionResponse)
async def detect_video(file: UploadFile = File(...)) -> DetectionResponse:
t0 = time.monotonic()
content_type = (file.content_type or "").split(";")[0].strip().lower()
if content_type not in VIDEO_TYPES:
raise HTTPException(status_code=415, detail=f"Unsupported type: {file.content_type}")
data = await file.read()
if len(data) > MAX_VIDEO_MB * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
metadata_text = _collect_video_metadata_text(
data,
content_type=file.content_type,
filename=file.filename,
)
backend = get_inference_backend()
if backend == "hf" and not _is_test_mode():
try:
response = await _hf_detect_video(
data,
content_type=file.content_type,
filename=file.filename,
)
return _apply_metadata_keyword_signal(
response,
filename=file.filename,
metadata_text=metadata_text,
)
except HFInferenceUnavailable as exc:
logger.warning("HF video route failed, trying RunPod fallback: %s", exc)
except Exception as exc:
logger.warning("HF video route unexpected error, trying RunPod fallback: %s", exc)
if is_runpod_configured():
try:
return await route_inference(data, "video")
except Exception as exc:
raise HTTPException(
status_code=503,
detail=f"Hugging Face and RunPod failed for video inference: {exc}",
) from exc
raise HTTPException(
status_code=503,
detail="Hugging Face inference failed and RunPod is not configured.",
)
should_try_runpod = (
backend == "runpod"
or (backend == "auto" and len(data) > 20 * 1024 * 1024)
)
if should_try_runpod and not _is_test_mode() and is_runpod_configured():
try:
return await route_inference(data, "video")
except Exception as exc:
logger.warning("RunPod route failed, falling back to local video inference: %s", exc)
with tempfile.NamedTemporaryFile(
suffix=_video_temp_suffix(file.content_type, file.filename),
delete=False,
) as tmp:
tmp.write(data)
tmp_path = tmp.name
try:
try:
frames = await asyncio.to_thread(extract_video_frames, tmp_path, MAX_FRAMES)
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Video decode failed: {exc}") from exc
finally:
Path(tmp_path).unlink(missing_ok=True)
if not frames:
raise HTTPException(status_code=422, detail="Could not extract frames")
await _ensure_models_loaded()
try:
fp, co, st = await asyncio.gather(
asyncio.to_thread(_fp.run_video, frames),
asyncio.to_thread(_co.run_video, frames),
asyncio.to_thread(_st.run_video, frames),
)
except Exception as exc:
logger.exception("Video engine inference failed")
raise HTTPException(status_code=503, detail=f"Video analysis failed: {exc}") from exc
elapsed_ms = (time.monotonic() - t0) * 1000
engine_results = _assign_processing_time([fp, co, st], elapsed_ms)
verdict, conf, generator = fuse(engine_results, is_video=True)
if _is_test_mode():
explanation = _fallback_explanation(verdict, conf, generator)
else:
explanation = await asyncio.to_thread(explain, verdict, conf, engine_results, generator)
response = DetectionResponse(
verdict=verdict,
confidence=conf,
attributed_generator=generator,
explanation=explanation,
processing_time_ms=elapsed_ms,
engine_breakdown=engine_results,
)
return _apply_metadata_keyword_signal(
response,
filename=file.filename,
metadata_text=metadata_text,
)