import base64 import json import os, shutil import re import time import uuid import cv2 import numpy as np import streamlit as st from PIL import Image # from extract_video import extract_method_single_video import shlex import subprocess from file_picker import st_file_selector import os DEBUG = True def main(): st.markdown("###") uploaded_file = st.file_uploader('Upload a picture', type=['mp4', 'jpg', 'jpeg', 'png'], accept_multiple_files=False) with st.spinner(f'Loading samples...'): while not os.path.isdir("sample_files"): time.sleep(1) st.markdown("### or") selected_file = st_file_selector(st, path='sample_files', key = 'selected', label = 'Choose a sample image/video') if uploaded_file: random_id = uuid.uuid1() base_folder = "temps" filename = "{}.{}".format(random_id, uploaded_file.type.split("/")[-1]) file_type = uploaded_file.type.split("/")[0] filepath = f"{base_folder}/{filename}" faces_folder = f"{base_folder}/images/{random_id}" if uploaded_file.type == 'video/mp4': with open(f"temps/{filename}", mode='wb') as f: f.write(uploaded_file.read()) st.video(uploaded_file) else: img = Image.open(uploaded_file).convert('RGB') ext = uploaded_file.type.split("/")[-1] with open(f"temps/{filename}", mode='wb') as f: f.write(uploaded_file.getbuffer()) st.image(img) elif selected_file: base_folder = "sample_files" file_type = selected_file.split(".")[-1] filename = selected_file.split("/")[-1] filepath = f"{base_folder}/{selected_file}" faces_folder = f"{base_folder}/images/" + selected_file.split(".")[0] if file_type == 'mp4': video_file = open(filepath, 'rb') video_bytes = video_file.read() st.video(video_bytes) else: image_file = open(filepath, 'rb') image_bytes = image_file.read() st.image(image_bytes) else: return with st.spinner(f'Processing {file_type}...'): processing_stdout = subprocess.run(shlex.split(f"""python extract_video.py --device cpu --max_frames 50 --bs 2 --frame_interval 60 --confidence_threshold 0.997 --data_path "{filepath}" """), capture_output=True) st.text(f'1. Processing {file_type} ✅') with st.spinner(f'Analyzing {file_type}...'): analyze_stdout = subprocess.run(shlex.split(f"""python inference.py --weight weights/model_params_ffpp_c23.pickle --device cpu --image_folder "{faces_folder}" """), capture_output=True) st.text(f'2. Analyzing {file_type} ✅') if len(os.listdir(faces_folder)) < 1: st.text("No faces could be detected! 🚨") return try: fake_probability = float(analyze_stdout.stdout.decode('utf-8').split('Mean prediction: ')[-1]) if fake_probability > 0.6: st.error(' FAKE! ', icon="🚨") else: st.success(" REAL FOOTAGE! ", icon="✅") st.text("fake probability {:.2f}".format(fake_probability)) # os.remove(f"{base_folder}/{filename}") shutil.rmtree(faces_folder) except Exception as e: if DEBUG: st.text(processing_stdout.stdout.decode('utf-8')) st.text(analyze_stdout.stdout.decode('utf-8')) st.text("") st.text(processing_stdout) st.text(analyze_stdout) st.write(e) else: st.text("Encountered a problem while analyzing video/image 🚨") def setup(): if not os.path.isdir("temps"): os.makedirs("temps") if __name__ == "__main__": st.set_page_config( page_title="Nodeflux Deepfake Detection", page_icon=":pencil2:" ) st.title("Deepfake Detection") setup() main()