Spaces:
Runtime error
Runtime error
#before running this please change the RUNTIME to GPU (Runtime -> Change runtime type -> set harware accelarotor as GPU) | |
#Mount our google drive | |
#Note : only needed when you have to download the processed data to the environment | |
#download and unzip the data from google drive Colab environment | |
#THis code is to check if the video is corrupted or not.. | |
#If the video is corrupted delete the video. | |
import glob | |
import torch | |
import torchvision | |
from torchvision import transforms | |
from torch.utils.data import DataLoader | |
from torch.utils.data.dataset import Dataset | |
import os | |
import numpy as np | |
import cv2 | |
import matplotlib.pyplot as plt | |
import face_recognition | |
#Check if the file is corrupted or not | |
def validate_video(vid_path,train_transforms): | |
transform = train_transforms | |
count = 20 | |
video_path = vid_path | |
frames = [] | |
a = int(100/count) | |
first_frame = np.random.randint(0,a) | |
temp_video = video_path.split('/')[-1] | |
for i,frame in enumerate(frame_extract(video_path)): | |
frames.append(transform(frame)) | |
if(len(frames) == count): | |
break | |
frames = torch.stack(frames) | |
frames = frames[:count] | |
return frames | |
#extract a from from video | |
def frame_extract(path): | |
vidObj = cv2.VideoCapture(path) | |
success = 1 | |
while success: | |
success, image = vidObj.read() | |
if success: | |
yield image | |
im_size = 112 | |
mean = [0.485, 0.456, 0.406] | |
std = [0.229, 0.224, 0.225] | |
train_transforms = transforms.Compose([ | |
transforms.ToPILImage(), | |
transforms.Resize((im_size,im_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean,std)]) | |
video_fil = glob.glob('/content/drive/MyDrive/Face_only_data/*.mp4') | |
print("Total no of videos :" , len(video_fil)) | |
print(video_fil) | |
count = 0; | |
for i in video_fil: | |
try: | |
count+=1 | |
validate_video(i,train_transforms) | |
except: | |
print("Number of video processed: " , count ," Remaining : " , (len(video_fil) - count)) | |
print("Corrupted video is : " , i) | |
continue | |
print((len(video_fil) - count)) | |
#to load preprocessod video to memory | |
import json | |
import glob | |
import numpy as np | |
import cv2 | |
import copy | |
import random | |
video_files = glob.glob('/content/drive/MyDrive/Face_only_data/*.mp4') | |
random.shuffle(video_files) | |
random.shuffle(video_files) | |
frame_count = [] | |
for video_file in video_files: | |
cap = cv2.VideoCapture(video_file) | |
if(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))<100): | |
video_files.remove(video_file) | |
continue | |
frame_count.append(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) | |
print("frames are " , frame_count) | |
print("Total no of video: " , len(frame_count)) | |
print('Average frame per video:',np.mean(frame_count)) | |
# load the video name and labels from csv | |
import torch | |
import torchvision | |
from torchvision import transforms | |
from torch.utils.data import DataLoader | |
from torch.utils.data.dataset import Dataset | |
import os | |
import numpy as np | |
import cv2 | |
import matplotlib.pyplot as plt | |
import face_recognition | |
class video_dataset(Dataset): | |
def __init__(self,video_names,labels,sequence_length = 60,transform = None): | |
self.video_names = video_names | |
self.labels = labels | |
self.transform = transform | |
self.count = sequence_length | |
def __len__(self): | |
return len(self.video_names) | |
def __getitem__(self,idx): | |
video_path = self.video_names[idx] | |
frames = [] | |
a = int(100/self.count) | |
first_frame = np.random.randint(0,a) | |
temp_video = video_path.split('/')[-1] | |
#print(temp_video) | |
label = self.labels.iloc[(labels.loc[labels["filename"] == temp_video].index.values[0]),1] | |
if(label == 'fake'): | |
label = 0 | |
if(label == 'real'): | |
label = 1 | |
for i,frame in enumerate(self.frame_extract(video_path)): | |
frames.append(self.transform(frame)) | |
if(len(frames) == self.count): | |
break | |
frames = torch.stack(frames) | |
frames = frames[:self.count] | |
#print("length:" , len(frames), "label",label) | |
return frames,label | |
def frame_extract(self,path): | |
vidObj = cv2.VideoCapture(path) | |
success = 1 | |
while success: | |
success, image = vidObj.read() | |
if success: | |
yield image | |
#plot the image | |
def im_plot(tensor): | |
image = tensor.cpu().numpy().transpose(1,2,0) | |
b,g,r = cv2.split(image) | |
image = cv2.merge((r,g,b)) | |
image = image*[0.22803, 0.22145, 0.216989] + [0.43216, 0.394666, 0.37645] | |
image = image*255.0 | |
plt.imshow(image.astype(int)) | |
plt.show() | |
#count the number of fake and real videos | |
def number_of_real_and_fake_videos(data_list): | |
header_list = ["filename","label"] | |
lab = pd.read_csv('/content/drive/MyDrive/labels.csv',names=header_list) | |
fake = 0 | |
real = 0 | |
for i in data_list: | |
temp_video = i.split('/')[-1] | |
label = lab.iloc[(labels.loc[labels["filename"] == temp_video].index.values[0]),1] | |
if(label == 'fake'): | |
fake+=1 | |
if(label == 'real'): | |
real+=1 | |
return real,fake | |
# load the labels and video in data loader | |
import random | |
import pandas as pd | |
from sklearn.model_selection import train_test_split | |
header_list = ["filename","label"] | |
labels = pd.read_csv('/content/drive/MyDrive/labels.csv',names=header_list) | |
#print(labels) | |
train_videos = video_files[:int(0.75*len(video_files))] | |
valid_videos = video_files[int(0.75*len(video_files)):] | |
valid_label = labels[int(0.75*len(labels)):] | |
print("train : " , len(train_videos)) | |
print("test : " , len(valid_videos)) | |
# train_videos,valid_videos = train_test_split(data,test_size = 0.2) | |
# print(train_videos) | |
print("TRAIN: ", "Real:",number_of_real_and_fake_videos(train_videos)[0]," Fake:",number_of_real_and_fake_videos(train_videos)[1]) | |
print("TEST: ", "Real:",number_of_real_and_fake_videos(valid_videos)[0]," Fake:",number_of_real_and_fake_videos(valid_videos)[1]) | |
im_size = 112 | |
mean = [0.485, 0.456, 0.406] | |
std = [0.229, 0.224, 0.225] | |
train_transforms = transforms.Compose([ | |
transforms.ToPILImage(), | |
transforms.Resize((im_size,im_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean,std)]) | |
test_transforms = transforms.Compose([ | |
transforms.ToPILImage(), | |
transforms.Resize((im_size,im_size)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean,std)]) | |
train_data = video_dataset(train_videos,labels,sequence_length = 10,transform = train_transforms) | |
#print(train_data) | |
val_data = video_dataset(valid_videos,labels,sequence_length = 10,transform = train_transforms) | |
train_loader = DataLoader(train_data,batch_size = 4,shuffle = True,num_workers = 4) | |
valid_loader = DataLoader(val_data,batch_size = 4,shuffle = True,num_workers = 4) | |
image,label = train_data[0] | |
im_plot(image[0,:,:,:]) | |
lst = [[1,2],[3,4],[4,5]] | |
val_labels = [val_data[i][1] for i in range(len(val_data))] | |
val_labels | |
# for item in range(0,1): | |
print(train_data[1][1]) | |
val_data[1][1] | |
valid_label['label'] | |
from torch import nn | |
import timm | |
class Model1(nn.Module): | |
def __init__(self, num_classes, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False): | |
super(Model1, self).__init__() | |
model = timm.create_model('xception', pretrained=True) | |
print(model) | |
self.model = nn.Sequential(*list(model.children())[:-1]) | |
self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional) | |
self.relu = nn.LeakyReLU() | |
self.dp = nn.Dropout(0.4) | |
self.linear1 = nn.Linear(2048, num_classes) | |
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
def forward(self, x): | |
batch_size, seq_length, c, h, w = x.size() | |
print("Input tensor shape:", x.size()) | |
# Reshape the input tensor to (batch_size * sequence_length, channels, height, width) | |
x = x.view(batch_size * seq_length, c, h, w) | |
fmap = self.model(x) | |
# Reshape the feature map to (batch_size, sequence_length, num_features) | |
fmap = fmap.view(batch_size, seq_length, -1) | |
x_lstm, _ = self.lstm(fmap, None) | |
return fmap, self.dp(self.linear1(torch.mean(x_lstm, dim=1))) | |
model1 = Model1(2).cuda() | |
a,b = model1(torch.from_numpy(np.empty((1,20,3,112,112))).type(torch.cuda.FloatTensor)) | |
import torch | |
from torch.autograd import Variable | |
import time | |
import os | |
import sys | |
import os | |
def train_epoch(epoch, num_epochs, data_loader, model, criterion, optimizer): | |
model.train() | |
losses = AverageMeter() | |
accuracies = AverageMeter() | |
t = [] | |
for i, (inputs, targets) in enumerate(data_loader): | |
if torch.cuda.is_available(): | |
targets = targets.type(torch.cuda.LongTensor) | |
inputs = inputs.cuda() | |
_,outputs = model(inputs) | |
loss = criterion(outputs,targets.type(torch.cuda.LongTensor)) | |
acc = calculate_accuracy(outputs, targets.type(torch.cuda.LongTensor)) | |
losses.update(loss.item(), inputs.size(0)) | |
accuracies.update(acc, inputs.size(0)) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
sys.stdout.write( | |
"\r[Epoch %d/%d] [Batch %d / %d] [Loss: %f, Acc: %.2f%%]" | |
% ( | |
epoch, | |
num_epochs, | |
i, | |
len(data_loader), | |
losses.avg, | |
accuracies.avg)) | |
torch.save(model.state_dict(),'/content/drive/MyDrive/checkpoint1.pt') | |
return losses.avg,accuracies.avg | |
def test(epoch,model, data_loader ,criterion): | |
print('Testing') | |
model.eval() | |
losses = AverageMeter() | |
accuracies = AverageMeter() | |
pred = [] | |
true = [] | |
count = 0 | |
with torch.no_grad(): | |
for i, (inputs, targets) in enumerate(data_loader): | |
if torch.cuda.is_available(): | |
targets = targets.cuda().type(torch.cuda.FloatTensor) | |
inputs = inputs.cuda() | |
_,outputs = model(inputs) | |
loss = torch.mean(criterion(outputs, targets.type(torch.cuda.LongTensor))) | |
acc = calculate_accuracy(outputs,targets.type(torch.cuda.LongTensor)) | |
_,p = torch.max(outputs,1) | |
true += (targets.type(torch.cuda.LongTensor)).detach().cpu().numpy().reshape(len(targets)).tolist() | |
pred += p.detach().cpu().numpy().reshape(len(p)).tolist() | |
losses.update(loss.item(), inputs.size(0)) | |
accuracies.update(acc, inputs.size(0)) | |
sys.stdout.write( | |
"\r[Batch %d / %d] [Loss: %f, Acc: %.2f%%]" | |
% ( | |
i, | |
len(data_loader), | |
losses.avg, | |
accuracies.avg | |
) | |
) | |
print('\nAccuracy {}'.format(accuracies.avg)) | |
return true,pred,losses.avg,accuracies.avg | |
class AverageMeter(object): | |
"""Computes and stores the average and current value""" | |
def __init__(self): | |
self.reset() | |
def reset(self): | |
self.val = 0 | |
self.avg = 0 | |
self.sum = 0 | |
self.count = 0 | |
def update(self, val, n=1): | |
self.val = val | |
self.sum += val * n | |
self.count += n | |
self.avg = self.sum / self.count | |
def calculate_accuracy(outputs, targets): | |
batch_size = targets.size(0) | |
_, pred = outputs.topk(1, 1, True) | |
pred = pred.t() | |
correct = pred.eq(targets.view(1, -1)) | |
n_correct_elems = correct.float().sum().item() | |
return 100* n_correct_elems / batch_size | |
import seaborn as sn | |
#Output confusion matrix | |
def print_confusion_matrix(y_true, y_pred): | |
cm = confusion_matrix(y_true, y_pred) | |
print('True positive = ', cm[0][0]) | |
print('False positive = ', cm[0][1]) | |
print('False negative = ', cm[1][0]) | |
print('True negative = ', cm[1][1]) | |
print('\n') | |
df_cm = pd.DataFrame(cm, range(2), range(2)) | |
sn.set(font_scale=1.4) # for label size | |
sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size | |
plt.ylabel('Actual label', size = 20) | |
plt.xlabel('Predicted label', size = 20) | |
plt.xticks(np.arange(2), ['Fake', 'Real'], size = 16) | |
plt.yticks(np.arange(2), ['Fake', 'Real'], size = 16) | |
plt.ylim([2, 0]) | |
plt.show() | |
calculated_acc = (cm[0][0]+cm[1][1])/(cm[0][0]+cm[0][1]+cm[1][0]+ cm[1][1]) | |
print("Calculated Accuracy",calculated_acc*100) | |
def plot_loss(train_loss_avg,test_loss_avg,num_epochs): | |
loss_train = train_loss_avg | |
loss_val = test_loss_avg | |
print(num_epochs) | |
epochs = range(1,num_epochs+1) | |
plt.plot(epochs, loss_train, 'g', label='Training loss') | |
plt.plot(epochs, loss_val, 'b', label='validation loss') | |
plt.title('Training and Validation loss') | |
plt.xlabel('Epochs') | |
plt.ylabel('Loss') | |
plt.legend() | |
plt.show() | |
def plot_accuracy(train_accuracy,test_accuracy,num_epochs): | |
loss_train = train_accuracy | |
loss_val = test_accuracy | |
epochs = range(1,num_epochs+1) | |
plt.plot(epochs, loss_train, 'g', label='Training accuracy') | |
plt.plot(epochs, loss_val, 'b', label='validation accuracy') | |
plt.title('Training and Validation accuracy') | |
plt.xlabel('Epochs') | |
plt.ylabel('Accuracy') | |
plt.legend() | |
plt.show() | |
from sklearn.metrics import confusion_matrix | |
#learning rate | |
lr = 1e-5#0.001 | |
#number of epochs | |
num_epochs = 40 | |
optimizer = torch.optim.Adam(model1.parameters(), lr= lr,weight_decay = 1e-5) | |
#class_weights = torch.from_numpy(np.asarray([1,15])).type(torch.FloatTensor).cuda() | |
#criterion = nn.CrossEntropyLoss(weight = class_weights).cuda() | |
criterion = nn.CrossEntropyLoss().cuda() | |
train_loss_avg =[] | |
train_accuracy = [] | |
test_loss_avg = [] | |
test_accuracy = [] | |
for epoch in range(1,num_epochs+1): | |
l, acc = train_epoch(epoch,num_epochs,train_loader,model1,criterion,optimizer) | |
train_loss_avg.append(l) | |
train_accuracy.append(acc) | |
true,pred,tl,t_acc = test(epoch,model1,valid_loader,criterion) | |
test_loss_avg.append(tl) | |
test_accuracy.append(t_acc) | |
plot_loss(train_loss_avg,test_loss_avg,len(train_loss_avg)) | |
plot_accuracy(train_accuracy,test_accuracy,len(train_accuracy)) | |
print(confusion_matrix(true,pred)) | |
print_confusion_matrix(true,pred) | |
from torch import nn | |
import timm | |
class Model2(nn.Module): | |
def __init__(self, num_classes, latent_dim=2048, lstm_layers=1, hidden_dim=2048, bidirectional=False): | |
super(Model2, self).__init__() | |
# Create the Inception model | |
model = timm.create_model('inception_v3', pretrained=True) | |
# Remove the classification head | |
model = list(model.children())[:-1] | |
self.model = nn.Sequential(*model) | |
self.lstm = nn.LSTM(latent_dim, hidden_dim, lstm_layers, bidirectional) | |
self.relu = nn.LeakyReLU() | |
self.dp = nn.Dropout(0.4) | |
# Linear layer for classification | |
self.linear1 = nn.Linear(2048, num_classes) | |
# Adaptive pooling layer | |
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
def forward(self, x): | |
batch_size, seq_length, c, h, w = x.size() | |
print("Input tensor shape:", x.size()) | |
# Reshape the input tensor to (batch_size * sequence_length, channels, height, width) | |
x = x.view(batch_size * seq_length, c, h, w) | |
fmap = self.model(x) | |
# Reshape the feature map to (batch_size, sequence_length, num_features) | |
fmap = fmap.view(batch_size, seq_length, -1) | |
x_lstm, _ = self.lstm(fmap, None) | |
return fmap, self.dp(self.linear1(torch.mean(x_lstm, dim=1))) | |
model2 = Model2(2).cuda() | |
a,b = model2(torch.from_numpy(np.empty((1,20,3,112,112))).type(torch.cuda.FloatTensor)) | |
from sklearn.metrics import confusion_matrix | |
#learning rate | |
lr = 1e-5#0.001 | |
#number of epochs | |
num_epochs = 50 | |
optimizer = torch.optim.Adam(model2.parameters(), lr= lr,weight_decay = 1e-5) | |
#class_weights = torch.from_numpy(np.asarray([1,15])).type(torch.FloatTensor).cuda() | |
#criterion = nn.CrossEntropyLoss(weight = class_weights).cuda() | |
criterion = nn.CrossEntropyLoss().cuda() | |
train_loss_avg =[] | |
train_accuracy = [] | |
test_loss_avg = [] | |
test_accuracy = [] | |
for epoch in range(1,num_epochs+1): | |
l, acc = train_epoch(epoch,num_epochs,train_loader,model2,criterion,optimizer) | |
train_loss_avg.append(l) | |
train_accuracy.append(acc) | |
true,pred,tl,t_acc = test(epoch,model2,valid_loader,criterion) | |
test_loss_avg.append(tl) | |
test_accuracy.append(t_acc) | |
plot_loss(train_loss_avg,test_loss_avg,len(train_loss_avg)) | |
plot_accuracy(train_accuracy,test_accuracy,len(train_accuracy)) | |
print(confusion_matrix(true,pred)) | |
print_confusion_matrix(true,pred) | |
#Model with feature visualization | |
from torch import nn | |
from torchvision import models | |
class Model3(nn.Module): | |
def __init__(self, num_classes,latent_dim= 2048, lstm_layers=1 , hidden_dim = 2048, bidirectional = False): | |
super(Model3, self).__init__() | |
model = models.resnext50_32x4d(pretrained = True) | |
self.model = nn.Sequential(*list(model.children())[:-2]) | |
self.lstm = nn.LSTM(latent_dim,hidden_dim, lstm_layers, bidirectional) | |
self.relu = nn.LeakyReLU() | |
self.dp = nn.Dropout(0.4) | |
self.linear1 = nn.Linear(2048,num_classes) | |
self.avgpool = nn.AdaptiveAvgPool2d(1) | |
def forward(self, x): | |
batch_size,seq_length, c, h, w = x.shape | |
x = x.view(batch_size * seq_length, c, h, w) | |
fmap = self.model(x) | |
x = self.avgpool(fmap) | |
x = x.view(batch_size,seq_length,2048) | |
x_lstm,_ = self.lstm(x,None) | |
return fmap,self.dp(self.linear1(x_lstm[:,-1,:])) | |
model3 = Model3(2).cuda() | |
a,b = model3(torch.from_numpy(np.empty((1,20,3,112,112))).type(torch.cuda.FloatTensor)) | |
from sklearn.metrics import confusion_matrix | |
#learning rate | |
lr = 1e-5#0.001 | |
#number of epochs | |
num_epochs = 50 | |
optimizer = torch.optim.Adam(model3.parameters(), lr= lr,weight_decay = 1e-5) | |
#class_weights = torch.from_numpy(np.asarray([1,15])).type(torch.FloatTensor).cuda() | |
#criterion = nn.CrossEntropyLoss(weight = class_weights).cuda() | |
criterion = nn.CrossEntropyLoss().cuda() | |
train_loss_avg =[] | |
train_accuracy = [] | |
test_loss_avg = [] | |
test_accuracy = [] | |
for epoch in range(1,num_epochs+1): | |
l, acc = train_epoch(epoch,num_epochs,train_loader,model3,criterion,optimizer) | |
train_loss_avg.append(l) | |
train_accuracy.append(acc) | |
true,pred,tl,t_acc = test(epoch,model3,valid_loader,criterion) | |
test_loss_avg.append(tl) | |
test_accuracy.append(t_acc) | |
plot_loss(train_loss_avg,test_loss_avg,len(train_loss_avg)) | |
plot_accuracy(train_accuracy,test_accuracy,len(train_accuracy)) | |
print(confusion_matrix(true,pred)) | |
print_confusion_matrix(true,pred) | |
models = [model1, model2] | |
# preds = [model.predict(valid_loader) for model in models] | |
true1,pred1,tl1,t_acc1 = test(epoch,model1,valid_loader,criterion) | |
true2,pred2,tl2,t_acc2 = test(epoch,model2,valid_loader,criterion) | |
true3,pred3,tl3,t_acc3 = test(epoch,model3,valid_loader,criterion) | |
preds=np.array([pred1,pred2]) | |
# summed = np.sum(preds, axis=0) | |
# # argmax across classes | |
# ensemble_prediction = np.argmax(summed, axis=1) | |
# # combined_pred = (pred1 + pred2) / 2 # Averaging the predictions of the two models | |
# # Compute combined accuracy | |
# combined_accuracy = (t_acc1 + t_acc2) / 2 | |
# # print(combined_pred) | |
# print(combined_accuracy) | |
# print((pred1)) | |
# w1 = 0.5 | |
# w2 = 1.5 | |
# combined_accuracy = (w1 * t_acc1 + w2 * t_acc2) / (w1 + w2) | |
# combined_accuracy | |
# summed = np.sum(preds, axis=0) | |
# print(summed) | |
# # argmax across classes | |
# ensemble_prediction = np.argmax(summed) | |
# ensemble_prediction | |
# # print(epoch) | |
# # print(model1) | |
# print(np.array(valid_loader)) | |
# print(criterion) | |
print(val_data) | |
# import numpy as np | |
# # Convert pred1 and pred2 to numpy arrays | |
# pred1_array = np.array(pred1) | |
# pred2_array = np.array(pred2) | |
# val_array = np.array(val_labels) | |
# weighted_pred = (pred2_array + pred1_array) / 2 | |
# # Compute accuracy of the ensemble model | |
# ensemble_accuracy = np.mean(np.argmax(weighted_pred) == val_array) | |
# print("Ensemble accuracy:", ensemble_accuracy) | |
import numpy as np | |
# Define a grid of weights to search over | |
weight_range = np.linspace(0, 1, num=11) # Adjust the number of values and range as needed | |
best_accuracy = 0.0 | |
best_weights = None | |
# Iterate over all combinations of weights | |
for w1 in weight_range: | |
for w2 in weight_range: | |
for w3 in weight_range: | |
# Ensure the sum of weights is 1 | |
total_weight = w1 + w2 + w3 | |
if total_weight == 0: | |
continue | |
w1 /= total_weight | |
w2 /= total_weight | |
w3 /= total_weight | |
# Combine accuracies using weighted average | |
weighted_accuracy = (w1 * t_acc1 + | |
w2 * t_acc2 + | |
w3 * t_acc3) | |
# Update best accuracy and weights if current ensemble accuracy is higher | |
if weighted_accuracy > best_accuracy: | |
best_accuracy = weighted_accuracy | |
best_weights = (w1, w2, w3) | |
print("Best ensemble accuracy:", best_accuracy) | |
print("Best weights:", best_weights) | |
# weight_range = np.linspace(0, 1, num=11) | |
# weight_range | |