from django.shortcuts import render, redirect import torch import torchvision from torchvision import transforms, models from torch.utils.data import DataLoader from torch.utils.data.dataset import Dataset import os import numpy as np import cv2 import matplotlib.pyplot as plt import face_recognition from torch.autograd import Variable import time import sys from torch import nn import json import glob import copy from torchvision import models import shutil from PIL import Image as pImage import time from django.conf import settings from .forms import VideoUploadForm index_template_name = 'index.html' predict_template_name = 'predict.html' im_size = 112 mean=[0.485, 0.456, 0.406] std=[0.229, 0.224, 0.225] sm = nn.Softmax() inv_normalize = transforms.Normalize(mean=-1*np.divide(mean,std),std=np.divide([1,1,1],std)) train_transforms = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((im_size,im_size)), transforms.ToTensor(), transforms.Normalize(mean,std)]) class Model(nn.Module): def __init__(self, num_classes,latent_dim= 2048, lstm_layers=1 , hidden_dim = 2048, bidirectional = False): super(Model, self).__init__() model = models.resnext50_32x4d(pretrained = True) self.model = nn.Sequential(*list(model.children())[:-2]) self.lstm = nn.LSTM(latent_dim,hidden_dim, lstm_layers, bidirectional) self.relu = nn.LeakyReLU() self.dp = nn.Dropout(0.4) self.linear1 = nn.Linear(2048,num_classes) self.avgpool = nn.AdaptiveAvgPool2d(1) def forward(self, x): batch_size,seq_length, c, h, w = x.shape x = x.view(batch_size * seq_length, c, h, w) fmap = self.model(x) x = self.avgpool(fmap) x = x.view(batch_size,seq_length,2048) x_lstm,_ = self.lstm(x,None) return fmap,self.dp(self.linear1(x_lstm[:,-1,:])) class validation_dataset(Dataset): def __init__(self,video_names,sequence_length=60,transform = None): self.video_names = video_names self.transform = transform self.count = sequence_length def __len__(self): return len(self.video_names) def __getitem__(self,idx): video_path = self.video_names[idx] frames = [] a = int(100/self.count) first_frame = np.random.randint(0,a) for i,frame in enumerate(self.frame_extract(video_path)): #if(i % a == first_frame): faces = face_recognition.face_locations(frame) try: top,right,bottom,left = faces[0] frame = frame[top:bottom,left:right,:] except: pass frames.append(self.transform(frame)) if(len(frames) == self.count): break """ for i,frame in enumerate(self.frame_extract(video_path)): if(i % a == first_frame): frames.append(self.transform(frame)) """ # if(len(frames) 1: accuracy = [] for i in sequence_model: acc = i.split("_")[1] accuracy.append(acc) max_index = accuracy.index(max(accuracy)) final_model = sequence_model[max_index] else: final_model = sequence_model[0] return final_model ALLOWED_VIDEO_EXTENSIONS = set(['mp4','gif','webm','avi','3gp','wmv','flv','mkv']) def allowed_video_file(filename): #print("filename" ,filename.rsplit('.',1)[1].lower()) if (filename.rsplit('.',1)[1].lower() in ALLOWED_VIDEO_EXTENSIONS): return True else: return False def index(request): if request.method == 'GET': video_upload_form = VideoUploadForm() if 'file_name' in request.session: del request.session['file_name'] if 'preprocessed_images' in request.session: del request.session['preprocessed_images'] if 'faces_cropped_images' in request.session: del request.session['faces_cropped_images'] return render(request, index_template_name, {"form": video_upload_form}) else: video_upload_form = VideoUploadForm(request.POST, request.FILES) if video_upload_form.is_valid(): video_file = video_upload_form.cleaned_data['upload_video_file'] video_file_ext = video_file.name.split('.')[-1] sequence_length = video_upload_form.cleaned_data['sequence_length'] video_content_type = video_file.content_type.split('/')[0] if video_content_type in settings.CONTENT_TYPES: if video_file.size > int(settings.MAX_UPLOAD_SIZE): video_upload_form.add_error("upload_video_file", "Maximum file size 100 MB") return render(request, index_template_name, {"form": video_upload_form}) if sequence_length <= 0: video_upload_form.add_error("sequence_length", "Sequence Length must be greater than 0") return render(request, index_template_name, {"form": video_upload_form}) if allowed_video_file(video_file.name) == False: video_upload_form.add_error("upload_video_file","Only video files are allowed ") return render(request, index_template_name, {"form": video_upload_form}) saved_video_file = 'uploaded_file_'+str(int(time.time()))+"."+video_file_ext if settings.DEBUG: with open(os.path.join(settings.PROJECT_DIR, 'uploaded_videos', saved_video_file), 'wb') as vFile: shutil.copyfileobj(video_file, vFile) request.session['file_name'] = os.path.join(settings.PROJECT_DIR, 'uploaded_videos', saved_video_file) else: with open(os.path.join(settings.PROJECT_DIR, 'uploaded_videos','app','uploaded_videos', saved_video_file), 'wb') as vFile: shutil.copyfileobj(video_file, vFile) request.session['file_name'] = os.path.join(settings.PROJECT_DIR, 'uploaded_videos','app','uploaded_videos', saved_video_file) request.session['sequence_length'] = sequence_length return redirect('ml_app:predict') else: return render(request, index_template_name, {"form": video_upload_form}) def predict_page(request): if request.method == "GET": if 'file_name' not in request.session: return redirect("ml_app:home") if 'file_name' in request.session: video_file = request.session['file_name'] if 'sequence_length' in request.session: sequence_length = request.session['sequence_length'] path_to_videos = [video_file] video_file_name = video_file.split('\\')[-1] if settings.DEBUG == False: production_video_name = video_file_name.split('/')[3:] production_video_name = '/'.join([str(elem) for elem in production_video_name]) print("Production file name",production_video_name) video_file_name_only = video_file_name.split('.')[0] video_dataset = validation_dataset(path_to_videos, sequence_length=sequence_length,transform= train_transforms) model = Model(2).cuda() model_name = os.path.join(settings.PROJECT_DIR,'models', get_accurate_model(sequence_length)) models_location = os.path.join(settings.PROJECT_DIR,'models') path_to_model = os.path.join(settings.PROJECT_DIR, model_name) model.load_state_dict(torch.load(path_to_model)) model.eval() start_time = time.time() # Start: Displaying preprocessing images print("<=== | Started Videos Splitting | ===>") preprocessed_images = [] faces_cropped_images = [] cap = cv2.VideoCapture(video_file) frames = [] while(cap.isOpened()): ret, frame = cap.read() if ret==True: frames.append(frame) if cv2.waitKey(1) & 0xFF == ord('q'): break else: break cap.release() for i in range(1, sequence_length+1): frame = frames[i] image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = pImage.fromarray(image, 'RGB') image_name = video_file_name_only+"_preprocessed_"+str(i)+'.png' if settings.DEBUG: image_path = os.path.join(settings.PROJECT_DIR, 'uploaded_images', image_name) else: print("image_name",image_name) image_path = "/home/app/staticfiles" + image_name img.save(image_path) preprocessed_images.append(image_name) print("<=== | Videos Splitting Done | ===>") print("--- %s seconds ---" % (time.time() - start_time)) # End: Displaying preprocessing images # Start: Displaying Faces Cropped Images print("<=== | Started Face Cropping Each Frame | ===>") padding = 40 faces_found = 0 for i in range(1, sequence_length+1): frame = frames[i] #fig, ax = plt.subplots(1,1, figsize=(5, 5)) face_locations = face_recognition.face_locations(frame) if len(face_locations) == 0: continue top, right, bottom, left = face_locations[0] frame_face = frame[top-padding:bottom+padding, left-padding:right+padding] image = cv2.cvtColor(frame_face, cv2.COLOR_BGR2RGB) img = pImage.fromarray(image, 'RGB') image_name = video_file_name_only+"_cropped_faces_"+str(i)+'.png' if settings.DEBUG: image_path = os.path.join(settings.PROJECT_DIR, 'uploaded_images', video_file_name_only+"_cropped_faces_"+str(i)+'.png') else: image_path = "/home/app/staticfiles" + image_name img.save(image_path) faces_found = faces_found + 1 faces_cropped_images.append(image_name) print("<=== | Face Cropping Each Frame Done | ===>") print("--- %s seconds ---" % (time.time() - start_time)) # No face is detected if faces_found == 0: return render(request, predict_template_name, {"no_faces": True}) # End: Displaying Faces Cropped Images try: heatmap_images = [] for i in range(0, len(path_to_videos)): output = "" print("<=== | Started Predicition | ===>") prediction = predict(model, video_dataset[i], './', video_file_name_only) confidence = round(prediction[1], 1) print("<=== | Predicition Done | ===>") # print("<=== | Heat map creation started | ===>") # for j in range(0, sequence_length): # heatmap_images.append(plot_heat_map(j, model, video_dataset[i], './', video_file_name_only)) if prediction[0] == 1: output = "REAL" else: output = "FAKE" print("Prediction : " , prediction[0],"==",output ,"Confidence : " , confidence) print("--- %s seconds ---" % (time.time() - start_time)) if settings.DEBUG: return render(request, predict_template_name, {'preprocessed_images': preprocessed_images, 'heatmap_images': heatmap_images, "faces_cropped_images": faces_cropped_images, "original_video": video_file_name, "models_location": models_location, "output": output, "confidence": confidence}) else: return render(request, predict_template_name, {'preprocessed_images': preprocessed_images, 'heatmap_images': heatmap_images, "faces_cropped_images": faces_cropped_images, "original_video": production_video_name, "models_location": models_location, "output": output, "confidence": confidence}) except: return render(request, 'cuda_full.html') def about(request): return render(request, about_template_name) def handler404(request,exception): return render(request, '404.html', status=404) def cuda_full(request): return render(request, 'cuda_full.html')