Spaces:
Sleeping
Sleeping
import argparse | |
import datetime | |
import os | |
import json | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import cv2 | |
from PIL import Image | |
import tensorflow as tf | |
from tensorflow.keras.models import Model, Sequential, load_model, model_from_json | |
from tensorflow.compat.v1.keras.backend import set_session | |
from facial_analysis import FacialImageProcessing | |
class NpEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.integer): | |
return int(obj) | |
if isinstance(obj, np.floating): | |
return float(obj) | |
if isinstance(obj, np.ndarray): | |
return obj.tolist() | |
return super(NpEncoder, self).default(obj) | |
def initialize(): | |
config = tf.compat.v1.ConfigProto() | |
config.gpu_options.allow_growth = True | |
sess = tf.compat.v1.Session(config=config) | |
set_session(sess) | |
def mobilenet_preprocess_input(x, **kwargs): | |
x[..., 0] -= 103.939 | |
x[..., 1] -= 116.779 | |
x[..., 2] -= 123.68 | |
return x | |
def detect_emotion(frame_bgr): | |
imgProcessing = FacialImageProcessing(False) | |
model = load_model('./models/affectnet_emotions/mobilenet_7.h5') | |
# print(model.summary()) | |
preprocessing_function = mobilenet_preprocess_input | |
INPUT_SIZE = (224, 224) | |
idx_to_class = {0: 'Anger', 1: 'Disgust', 2: 'Fear', | |
3: 'Happiness', 4: 'Neutral', 5: 'Sadness', 6: 'Surprise'} | |
frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) | |
bounding_boxes, points = imgProcessing.detect_faces(frame) | |
points = points.T | |
detections = {"id": str(datetime.datetime.now())} | |
for bbox, p in zip(bounding_boxes, points): | |
face_pred = {} | |
box = bbox.astype(np.int) | |
x1, y1, x2, y2 = box[0:4] | |
face_img = frame[y1:y2, x1:x2, :] | |
try: | |
face_img = cv2.resize(face_img, INPUT_SIZE) | |
except: | |
break | |
inp = face_img.astype(np.float32) | |
inp[..., 0] -= 103.939 | |
inp[..., 1] -= 116.779 | |
inp[..., 2] -= 123.68 | |
inp = np.expand_dims(inp, axis=0) | |
scores = model.predict(inp)[0] | |
frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 9, 12), 4) | |
cv2.putText(frame, idx_to_class[np.argmax(scores)] + ' ' + str(scores[np.argmax( | |
scores)]), (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) | |
face_pred["face_bbox"] = [x1,y1,x2,y2] | |
face_pred["emotion_predicted"] = idx_to_class[np.argmax(scores)] | |
all_scores = {} | |
for i in range(len(scores)): | |
all_scores[str(idx_to_class[i])] = scores[i] | |
face_pred["scores"] = all_scores | |
detections["face"] = face_pred | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
print(detections) | |
return frame, detections | |
def process_video(video): | |
basename = os.path.basename(video) | |
name_only = os.path.splitext(basename)[0] | |
video_outputpath = os.path.join('./output',basename) | |
json_outputpath = os.path.join('./output',name_only + '.json') | |
# Writing to sample.json | |
with open(json_outputpath, "w") as jsonfile: | |
videocap = cv2.VideoCapture(video) # fpath) | |
ret, frame = videocap.read() | |
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') | |
fps = 24.0 | |
size = (frame.shape[1], frame.shape[0]) | |
out = cv2.VideoWriter(video_outputpath, fourcc, fps, size) | |
# for i in range(len(image_array)): | |
# out.write(image_array[i]) | |
max_frame = 500 | |
cnt = 0 | |
while ret == True and cnt < 50: | |
processed_frame, detections = detect_emotion(frame) | |
json_object = json.dumps(detections, indent=4, cls=NpEncoder) | |
jsonfile.write(json_object) | |
cv2.imshow('img', np.array(processed_frame, dtype=np.uint8)) | |
out.write(processed_frame) | |
ret, frame = videocap.read() | |
cv2.waitKey(1) | |
cnt += 1 | |
videocap.release() | |
cv2.destroyAllWindows() | |
return out | |
def main(): | |
parser = argparse.ArgumentParser(description='Analysis of Video') | |
parser.add_argument( | |
'-v', '--video', help='Video to be analysed', required=True) | |
args = parser.parse_args() | |
process_video(args.video) | |
if __name__ == '__main__': | |
main() | |