pavankumarvk commited on
Commit
3d90c9f
·
verified ·
1 Parent(s): 17f8d7a

Update pipeline.py

Browse files
Files changed (1) hide show
  1. pipeline.py +131 -33
pipeline.py CHANGED
@@ -1,15 +1,19 @@
1
- import os
2
- import cv2
3
- import torch
4
- import zipfile
5
  import librosa
6
- import numpy as np
7
  import tensorflow as tf
8
- from facenet_pytorch import MTCNN
9
  from rawnet import RawNet
10
 
 
 
 
11
  tf.random.set_seed(42)
12
 
 
13
  if not os.path.exists("efficientnet-b0"):
14
  local_zip = "./efficientnet-b0.zip"
15
  if os.path.exists(local_zip):
@@ -18,43 +22,66 @@ if not os.path.exists("efficientnet-b0"):
18
  zip_ref.close()
19
  print("Model extracted successfully!")
20
 
 
 
21
  model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
22
 
 
 
23
  class DetectionPipeline:
24
- def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'):
 
 
 
 
 
 
 
 
 
 
 
 
25
  self.n_frames = n_frames
26
  self.batch_size = batch_size
27
  self.resize = resize
28
  self.input_modality = input_modality
29
 
30
  def __call__(self, filename):
 
 
 
 
 
31
  if self.input_modality == 'video':
 
32
  v_cap = cv2.VideoCapture(filename)
33
  v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
34
 
35
- sample = np.arange(0, v_len) if self.n_frames is None \
36
- else np.linspace(0, v_len-1, self.n_frames).astype(int)
 
 
 
37
 
 
38
  faces = []
39
  frames = []
40
-
41
  for j in range(v_len):
42
  success = v_cap.grab()
43
-
44
  if j in sample:
 
45
  success, frame = v_cap.retrieve()
46
  if not success:
47
  continue
48
-
49
  frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
50
 
 
51
  if self.resize is not None:
52
- frame = frame.resize(
53
- [int(d * self.resize) for d in frame.size]
54
- )
55
-
56
  frames.append(frame)
57
 
 
58
  if len(frames) % self.batch_size == 0 or j == sample[-1]:
59
  face2 = cv2.resize(frame, (224, 224))
60
  faces.append(face2)
@@ -63,50 +90,121 @@ class DetectionPipeline:
63
  return faces
64
 
65
  elif self.input_modality == 'image':
 
 
 
 
66
  image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
67
  image = cv2.resize(image, (224, 224))
68
- return image
69
 
 
 
 
 
 
70
  elif self.input_modality == 'audio':
 
 
 
71
  x, sr = librosa.load(filename)
72
  x_pt = torch.Tensor(x)
73
- x_pt = torch.unsqueeze(x_pt, dim=0)
74
  return x_pt
75
-
76
  else:
77
- raise ValueError("Invalid modality")
78
 
79
  detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
80
- detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image')
81
 
82
  def deepfakes_video_predict(input_video):
83
- faces = detection_video_pipeline(input_video)
84
 
85
- real_res, fake_res = [], []
 
 
 
86
 
87
  for face in faces:
88
- face2 = face / 255
 
89
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
90
  real, fake = pred[0], pred[1]
91
  real_res.append(real)
92
  fake_res.append(fake)
93
 
 
 
 
 
 
 
 
 
94
  real_mean = np.mean(real_res)
95
  fake_mean = np.mean(fake_res)
 
 
 
96
 
97
  if real_mean >= 0.5:
98
- return "The video is REAL. Confidence: " + str(round(100 - real_mean*100, 3)) + "%"
99
  else:
100
- return "The video is FAKE. Confidence: " + str(round(fake_mean*100, 3)) + "%"
101
 
102
- def deepfakes_image_predict(input_image):
103
- face = detection_image_pipeline(input_image)
104
- face2 = face / 255
105
 
106
- pred = model.predict(np.expand_dims(face2, axis=0))[0]
107
- real, fake = pred[0], pred[1]
108
 
 
 
 
 
 
109
  if real > 0.5:
110
- return "The image is REAL."
111
  else:
112
- return "The image is FAKE."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import cv2
3
+ import torch
4
+ import zipfile
5
  import librosa
6
+ import numpy as np
7
  import tensorflow as tf
8
+ from facenet_pytorch import MTCNN
9
  from rawnet import RawNet
10
 
11
+
12
+
13
+ #Set random seed for reproducibility.
14
  tf.random.set_seed(42)
15
 
16
+ # Extract model if not already extracted
17
  if not os.path.exists("efficientnet-b0"):
18
  local_zip = "./efficientnet-b0.zip"
19
  if os.path.exists(local_zip):
 
22
  zip_ref.close()
23
  print("Model extracted successfully!")
24
 
25
+ # Load models.
26
+ # Load model without compiling to avoid optimizer dependency issues
27
  model = tf.keras.models.load_model("efficientnet-b0/", compile=False)
28
 
29
+
30
+
31
  class DetectionPipeline:
32
+ """Pipeline class for detecting faces in the frames of a video file."""
33
+
34
+ def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality = 'video'):
35
+ """Constructor for DetectionPipeline class.
36
+ Keyword Arguments:
37
+ n_frames {int} -- Total number of frames to load. These will be evenly spaced
38
+ throughout the video. If not specified (i.e., None), all frames will be loaded.
39
+ (default: {None})
40
+ batch_size {int} -- Batch size to use with MTCNN face detector. (default: {32})
41
+ resize {float} -- Fraction by which to resize frames from original prior to face
42
+ detection. A value less than 1 results in downsampling and a value greater than
43
+ 1 result in upsampling. (default: {None})
44
+ """
45
  self.n_frames = n_frames
46
  self.batch_size = batch_size
47
  self.resize = resize
48
  self.input_modality = input_modality
49
 
50
  def __call__(self, filename):
51
+ """Load frames from an MP4 video and detect faces.
52
+ Arguments:
53
+ filename {str} -- Path to video.
54
+ """
55
+ # Create video reader and find length
56
  if self.input_modality == 'video':
57
+ print('Input modality is video.')
58
  v_cap = cv2.VideoCapture(filename)
59
  v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
60
 
61
+ # Pick 'n_frames' evenly spaced frames to sample
62
+ if self.n_frames is None:
63
+ sample = np.arange(0, v_len)
64
+ else:
65
+ sample = np.linspace(0, v_len - 1, self.n_frames).astype(int)
66
 
67
+ # Loop through frames
68
  faces = []
69
  frames = []
 
70
  for j in range(v_len):
71
  success = v_cap.grab()
 
72
  if j in sample:
73
+ # Load frame
74
  success, frame = v_cap.retrieve()
75
  if not success:
76
  continue
 
77
  frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
78
 
79
+ # Resize frame to desired size
80
  if self.resize is not None:
81
+ frame = frame.resize([int(d * self.resize) for d in frame.size])
 
 
 
82
  frames.append(frame)
83
 
84
+ # When batch is full, detect faces and reset frame list
85
  if len(frames) % self.batch_size == 0 or j == sample[-1]:
86
  face2 = cv2.resize(frame, (224, 224))
87
  faces.append(face2)
 
90
  return faces
91
 
92
  elif self.input_modality == 'image':
93
+ print('Input modality is image.')
94
+ #Perform inference for image modality.
95
+ print('Reading image')
96
+ # print(f"Image path is: {filename}")
97
  image = cv2.cvtColor(filename, cv2.COLOR_BGR2RGB)
98
  image = cv2.resize(image, (224, 224))
 
99
 
100
+ # if not face.any():
101
+ # print("No faces found...")
102
+
103
+ return image
104
+
105
  elif self.input_modality == 'audio':
106
+ print("INput modality is audio.")
107
+
108
+ #Load audio.
109
  x, sr = librosa.load(filename)
110
  x_pt = torch.Tensor(x)
111
+ x_pt = torch.unsqueeze(x_pt, dim = 0)
112
  return x_pt
113
+
114
  else:
115
+ raise ValueError("Invalid input modality. Must be either 'video' or image")
116
 
117
  detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
118
+ detection_image_pipeline = DetectionPipeline(batch_size = 1, input_modality = 'image')
119
 
120
  def deepfakes_video_predict(input_video):
 
121
 
122
+ faces = detection_video_pipeline(input_video)
123
+ total = 0
124
+ real_res = []
125
+ fake_res = []
126
 
127
  for face in faces:
128
+
129
+ face2 = face/255
130
  pred = model.predict(np.expand_dims(face2, axis=0))[0]
131
  real, fake = pred[0], pred[1]
132
  real_res.append(real)
133
  fake_res.append(fake)
134
 
135
+ total+=1
136
+
137
+ pred2 = pred[1]
138
+
139
+ if pred2 > 0.5:
140
+ fake+=1
141
+ else:
142
+ real+=1
143
  real_mean = np.mean(real_res)
144
  fake_mean = np.mean(fake_res)
145
+ print(f"Real Faces: {real_mean}")
146
+ print(f"Fake Faces: {fake_mean}")
147
+ text = ""
148
 
149
  if real_mean >= 0.5:
150
+ text = "The video is REAL. \n Deepfakes Confidence: " + str(round(100 - (real_mean*100), 3)) + "%"
151
  else:
152
+ text = "The video is FAKE. \n Deepfakes Confidence: " + str(round(fake_mean*100, 3)) + "%"
153
 
154
+ return text
 
 
155
 
 
 
156
 
157
+ def deepfakes_image_predict(input_image):
158
+ faces = detection_image_pipeline(input_image)
159
+ face2 = faces/255
160
+ pred = model.predict(np.expand_dims(face2, axis = 0))[0]
161
+ real, fake = pred[0], pred[1]
162
  if real > 0.5:
163
+ text2 = "The image is REAL. \n Deepfakes Confidence: " + str(round(100 - (real*100), 3)) + "%"
164
  else:
165
+ text2 = "The image is FAKE. \n Deepfakes Confidence: " + str(round(fake*100, 3)) + "%"
166
+ return text2
167
+
168
+ def load_audio_model():
169
+ d_args = {
170
+ "nb_samp": 64600,
171
+ "first_conv": 1024,
172
+ "in_channels": 1,
173
+ "filts": [20, [20, 20], [20, 128], [128, 128]],
174
+ "blocks": [2, 4],
175
+ "nb_fc_node": 1024,
176
+ "gru_node": 1024,
177
+ "nb_gru_layer": 3,
178
+ "nb_classes": 2}
179
+
180
+ model = RawNet(d_args = d_args, device='cpu')
181
+
182
+ #Load ckpt.
183
+ model_dict = model.state_dict()
184
+ ckpt = torch.load('RawNet2.pth', map_location=torch.device('cpu'))
185
+ model.load_state_dict(ckpt, model_dict)
186
+ return model
187
+
188
+ audio_label_map = {
189
+ 0: "Real audio",
190
+ 1: "Fake audio"
191
+ }
192
+
193
+ def deepfakes_audio_predict(input_audio):
194
+ #Perform inference on audio.
195
+ x, sr = input_audio
196
+ x_pt = torch.Tensor(x)
197
+ x_pt = torch.unsqueeze(x_pt, dim = 0)
198
+
199
+ #Load model.
200
+ model = load_audio_model()
201
+
202
+ #Perform inference.
203
+ grads = model(x_pt)
204
+
205
+ #Get the argmax.
206
+ grads_np = grads.detach().numpy()
207
+ result = np.argmax(grads_np)
208
+
209
+ return audio_label_map[result]
210
+