import numpy as np
import math
import cv2
import torch
from scipy import signal
from scipy.fft import fft
from scipy.signal import butter, filtfilt
# from facenet_pytorch import MTCNN
from face_detection import FaceDetection
import joblib
def butter_bandpass(sig, lowcut, highcut, fs, order=2):
# butterworth bandpass filter
sig = np.reshape(sig, -1)
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
y = filtfilt(b, a, sig)
return y
def face_detection(video_list):
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
mtcnn = MTCNN(device=device)
face_list = []
for t, frame in enumerate(video_list):
if t == 0:
boxes, _, = mtcnn.detect(
frame) # we only detect face bbox in the first frame, keep it in the following frames.
if t == 0:
box_len = np.max([boxes[0, 2] - boxes[0, 0], boxes[0, 3] - boxes[0, 1]])
box_half_len = np.round(box_len / 2 * 1.1).astype('int')
box_mid_y = np.round((boxes[0, 3] + boxes[0, 1]) / 2).astype('int')
box_mid_x = np.round((boxes[0, 2] + boxes[0, 0]) / 2).astype('int')
cropped_face = frame[box_mid_y - box_half_len:box_mid_y + box_half_len,
box_mid_x - box_half_len:box_mid_x + box_half_len]
cropped_face = cv2.resize(cropped_face, (128, 128))
print('face detection %2d' % (100 * (t + 1) / len(video_list)), '%', end='\r', flush=True)
face_list = np.array(face_list) # (T, H, W, C)
face_list = np.transpose(face_list, (3, 0, 1, 2)) # (C, T, H, W)
face_list = np.array(face_list)[np.newaxis]
return face_list
def face_detection_ROI(face_detection, frame_list):
face_frame_list = []
ROI1_list = []
ROI2_list = []
for i in range(0, frame_list.shape[0]):
frame = frame_list[i]
frame, face_frame, ROI1, ROI2, status, mask, face_region = FaceDetection.face_detect(face_detection, frame)
return np.array(face_frame_list), np.array(ROI1_list), np.array(ROI2_list), status, face_region
def hr_fft(sig, fs, harmonics_removal=True):
# get heart rate by FFT
# return both heart rate and PSD
sig = sig.reshape(-1)
sig = sig *[0])
sig_f = np.abs(fft(sig))
low_idx = np.round(0.6 / fs * sig.shape[0]).astype('int')
high_idx = np.round(4 / fs * sig.shape[0]).astype('int')
sig_f_original = sig_f.copy()
sig_f[:low_idx] = 0
sig_f[high_idx:] = 0
peak_idx, _ = signal.find_peaks(sig_f)
sort_idx = np.argsort(sig_f[peak_idx])
sort_idx = sort_idx[::-1]
peak_idx1 = peak_idx[sort_idx[0]]
peak_idx2 = peak_idx[sort_idx[1]]
f_hr1 = peak_idx1 / sig_f.shape[0] * fs
hr1 = f_hr1 * 60
f_hr2 = peak_idx2 / sig_f.shape[0] * fs
hr2 = f_hr2 * 60
if harmonics_removal:
if np.abs(hr1-2*hr2)<10:
hr = hr2
hr = hr1
hr = hr1
x_hr = np.arange(len(sig_f))/len(sig_f)*fs*60
return hr, sig_f_original, x_hr
def hr_fft_2(processed, fps):
L = len(processed)
fps = 30#float(L) / (times[-1] - times[-L]) # calculate HR using a true fps of processor of the computer, not the fps the camera provide
LEN = int(fps * 1.55)
# even_times = np.linspace(times[-L], times[-1], LEN)
processed = signal.detrend(processed) # detrend the signal to avoid interference of light change
# interpolated = np.interp(even_times, times[-L:], processed) # interpolation by 1
interpolated = processed
# interpolated = np.hamming(LEN) * interpolated # make the signal become more periodic (advoid spectral leakage)
norm = (interpolated - np.mean(interpolated))/np.std(interpolated)#normalization
# norm = interpolated / np.linalg.norm(interpolated)
raw = np.fft.rfft(norm * 30) # do real fft with the normalization multiplied by 10
raw_r = raw.copy()
freqs = float(fps) / LEN * np.arange(LEN / 2 + 1)
freqs = 60. * freqs
idx_remove = np.where((freqs < 50) & (freqs > 180))
raw[idx_remove] = 0
fft = np.abs(raw) ** 2 # get amplitude spectrum
idx = np.where((freqs > 50) & (freqs < 180)) # the range of frequency that HR is supposed to be within
pruned = fft[idx]
pfreq = freqs[idx]
# freqs = pfreq
fft = pruned
idx2 = np.argmax(pruned) # max in the range can be HR
bpm = pfreq[idx2]
# calculate Respiratory Rate, 计算呼吸率
idx_remove = np.where((freqs < 5) & (freqs > 60))
raw_r[idx_remove] = 0
fft_r = np.abs(raw_r) ** 2
idx = np.where((freqs > 5) & (freqs < 60))
pruned_r = fft_r[idx]
pfreq = freqs[idx]
idx3 = np.argmax(pruned_r)
pruned_r[idx3] = 0
idx3 = np.argmax(pruned_r)
respiratory_rate = pfreq[idx3]
return bpm, respiratory_rate
def calc_rr(peaklist, sample_rate, working_data={}):
peaklist = np.array(peaklist) #cast numpy array to be sure or correct array type
working_data['peaklist'] = peaklist # Make sure, peaklist is always an np.array
rr_list = (np.diff(peaklist) / sample_rate) * 1000.0
rr_indices = [(peaklist[i], peaklist[i+1]) for i in range(len(peaklist) - 1)]
rr_diff = np.abs(np.diff(rr_list))
rr_sqdiff = np.power(rr_diff, 2)
working_data['RR_list'] = rr_list
working_data['RR_indices'] = rr_indices
working_data['RR_diff'] = rr_diff
working_data['RR_sqdiff'] = rr_sqdiff
return working_data
def calculate_hrv(ippg, fps):
peak_array, _ = signal.find_peaks(ippg[-200::2]) # down sample rate: 2
peak_list = peak_array.tolist()
result = calc_rr(peak_list, fps/2)
# print(peak_list)
# RR_list = result['RR_list'].tolist()
# RR_diff = result['RR_diff'].tolist()
# print(RR_list)
# print(RR_diff)
RR_std = 0
if len(result['RR_list']) > 0:
RR_std = np.std(result['RR_list'], ddof=1) # calculate RR interval standard deviation
if math.isnan(RR_std):
RR_std = 0
return RR_std
def RGB_SpO2(ROI_list):
roi_avg = []
roi_std = []
for i in range(len(ROI_list)):
roi_avg.append(np.average(ROI_list[i], axis=(0, 1)))
roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1))
roi_avg = np.array(roi_avg)
roi_std = np.array(roi_std)
mean_red = roi_avg[:, 0]
std_red = roi_std[:, 0]
mean_green = roi_avg[:, 1]
std_green = roi_std[:, 1]
mean_blue = roi_avg[:, 2]
std_blue = roi_std[:, 2]
A = -11.2
B = 109.3
R = (np.average(mean_red) / np.average(std_red)) / (np.average(mean_blue) / np.average(std_blue))
SpO2 = A * R + B
return SpO2
def RGB_HR(ROI_list):
roi_avg = []
roi_std = []
for i in range(len(ROI_list)):
roi_avg.append(np.average(ROI_list[i], axis=(0, 1)))
roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1))
roi_avg = np.array(roi_avg)
roi_std = np.array(roi_std)
mean_red = roi_avg[:, 0]
std_red = roi_std[:, 0]
mean_green = roi_avg[:, 1]
std_green = roi_std[:, 1]
mean_blue = roi_avg[:, 2]
std_blue = roi_std[:, 2]
ippg_chanel_data = np.array((mean_red,std_red,mean_green,std_green,mean_blue,std_blue)).T
print(ippg_chanel_data )
# ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6)
HR_pred = []
model_list = joblib.load( './code/model_weight/lgb_model_threechanel2HR.pkl')
for model in model_list:
result = model.predict(ippg_chanel_data)
HR = np.mean(HR_pred, axis=0)
return np.mean(HR)