import numpy as np import math import cv2 import torch from scipy import signal from scipy.fft import fft from scipy.signal import butter, filtfilt # from facenet_pytorch import MTCNN from face_detection import FaceDetection import joblib def butter_bandpass(sig, lowcut, highcut, fs, order=2): # butterworth bandpass filter sig = np.reshape(sig, -1) nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype='band') y = filtfilt(b, a, sig) return y def face_detection(video_list): device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') mtcnn = MTCNN(device=device) face_list = [] for t, frame in enumerate(video_list): if t == 0: boxes, _, = mtcnn.detect( frame) # we only detect face bbox in the first frame, keep it in the following frames. if t == 0: box_len = np.max([boxes[0, 2] - boxes[0, 0], boxes[0, 3] - boxes[0, 1]]) box_half_len = np.round(box_len / 2 * 1.1).astype('int') box_mid_y = np.round((boxes[0, 3] + boxes[0, 1]) / 2).astype('int') box_mid_x = np.round((boxes[0, 2] + boxes[0, 0]) / 2).astype('int') cropped_face = frame[box_mid_y - box_half_len:box_mid_y + box_half_len, box_mid_x - box_half_len:box_mid_x + box_half_len] cropped_face = cv2.resize(cropped_face, (128, 128)) face_list.append(cropped_face) print('face detection %2d' % (100 * (t + 1) / len(video_list)), '%', end='\r', flush=True) face_list = np.array(face_list) # (T, H, W, C) face_list = np.transpose(face_list, (3, 0, 1, 2)) # (C, T, H, W) face_list = np.array(face_list)[np.newaxis] return face_list def face_detection_ROI(face_detection, frame_list): face_frame_list = [] ROI1_list = [] ROI2_list = [] for i in range(0, frame_list.shape[0]): frame = frame_list[i] frame, face_frame, ROI1, ROI2, status, mask, face_region = FaceDetection.face_detect(face_detection, frame) face_frame_list.append(face_frame) ROI1_list.append(ROI1) ROI2_list.append(ROI2) return np.array(face_frame_list), np.array(ROI1_list), np.array(ROI2_list), status, face_region def butter_bandpass(sig, lowcut, highcut, fs, order=2): # butterworth bandpass filter sig = np.reshape(sig, -1) nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype='band') y = filtfilt(b, a, sig) return y def hr_fft(sig, fs, harmonics_removal=True): # get heart rate by FFT # return both heart rate and PSD sig = sig.reshape(-1) sig = sig * signal.windows.hann(sig.shape[0]) sig_f = np.abs(fft(sig)) low_idx = np.round(0.6 / fs * sig.shape[0]).astype('int') high_idx = np.round(4 / fs * sig.shape[0]).astype('int') sig_f_original = sig_f.copy() sig_f[:low_idx] = 0 sig_f[high_idx:] = 0 peak_idx, _ = signal.find_peaks(sig_f) sort_idx = np.argsort(sig_f[peak_idx]) sort_idx = sort_idx[::-1] peak_idx1 = peak_idx[sort_idx[0]] peak_idx2 = peak_idx[sort_idx[1]] f_hr1 = peak_idx1 / sig_f.shape[0] * fs hr1 = f_hr1 * 60 f_hr2 = peak_idx2 / sig_f.shape[0] * fs hr2 = f_hr2 * 60 if harmonics_removal: if np.abs(hr1-2*hr2)<10: hr = hr2 else: hr = hr1 else: hr = hr1 x_hr = np.arange(len(sig_f))/len(sig_f)*fs*60 return hr, sig_f_original, x_hr def hr_fft_2(processed, fps): L = len(processed) fps = 30#float(L) / (times[-1] - times[-L]) # calculate HR using a true fps of processor of the computer, not the fps the camera provide LEN = int(fps * 1.55) # even_times = np.linspace(times[-L], times[-1], LEN) processed = signal.detrend(processed) # detrend the signal to avoid interference of light change # interpolated = np.interp(even_times, times[-L:], processed) # interpolation by 1 interpolated = processed # interpolated = np.hamming(LEN) * interpolated # make the signal become more periodic (advoid spectral leakage) norm = (interpolated - np.mean(interpolated))/np.std(interpolated)#normalization # norm = interpolated / np.linalg.norm(interpolated) raw = np.fft.rfft(norm * 30) # do real fft with the normalization multiplied by 10 raw_r = raw.copy() freqs = float(fps) / LEN * np.arange(LEN / 2 + 1) freqs = 60. * freqs idx_remove = np.where((freqs < 50) & (freqs > 180)) raw[idx_remove] = 0 fft = np.abs(raw) ** 2 # get amplitude spectrum idx = np.where((freqs > 50) & (freqs < 180)) # the range of frequency that HR is supposed to be within pruned = fft[idx] pfreq = freqs[idx] # freqs = pfreq fft = pruned idx2 = np.argmax(pruned) # max in the range can be HR bpm = pfreq[idx2] # calculate Respiratory Rate, 计算呼吸率 idx_remove = np.where((freqs < 5) & (freqs > 60)) raw_r[idx_remove] = 0 fft_r = np.abs(raw_r) ** 2 idx = np.where((freqs > 5) & (freqs < 60)) pruned_r = fft_r[idx] pfreq = freqs[idx] idx3 = np.argmax(pruned_r) pruned_r[idx3] = 0 idx3 = np.argmax(pruned_r) respiratory_rate = pfreq[idx3] return bpm, respiratory_rate def calc_rr(peaklist, sample_rate, working_data={}): peaklist = np.array(peaklist) #cast numpy array to be sure or correct array type working_data['peaklist'] = peaklist # Make sure, peaklist is always an np.array rr_list = (np.diff(peaklist) / sample_rate) * 1000.0 rr_indices = [(peaklist[i], peaklist[i+1]) for i in range(len(peaklist) - 1)] rr_diff = np.abs(np.diff(rr_list)) rr_sqdiff = np.power(rr_diff, 2) working_data['RR_list'] = rr_list working_data['RR_indices'] = rr_indices working_data['RR_diff'] = rr_diff working_data['RR_sqdiff'] = rr_sqdiff return working_data def calculate_hrv(ippg, fps): peak_array, _ = signal.find_peaks(ippg[-200::2]) # down sample rate: 2 peak_list = peak_array.tolist() result = calc_rr(peak_list, fps/2) # print(peak_list) # RR_list = result['RR_list'].tolist() # RR_diff = result['RR_diff'].tolist() # print(RR_list) # print(RR_diff) RR_std = 0 if len(result['RR_list']) > 0: RR_std = np.std(result['RR_list'], ddof=1) # calculate RR interval standard deviation if math.isnan(RR_std): RR_std = 0 return RR_std def RGB_SpO2(ROI_list): roi_avg = [] roi_std = [] for i in range(len(ROI_list)): roi_avg.append(np.average(ROI_list[i], axis=(0, 1))) roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1)) roi_avg = np.array(roi_avg) roi_std = np.array(roi_std) mean_red = roi_avg[:, 0] std_red = roi_std[:, 0] mean_green = roi_avg[:, 1] std_green = roi_std[:, 1] mean_blue = roi_avg[:, 2] std_blue = roi_std[:, 2] A = -11.2 B = 109.3 R = (np.average(mean_red) / np.average(std_red)) / (np.average(mean_blue) / np.average(std_blue)) SpO2 = A * R + B return SpO2 def RGB_HR(ROI_list): roi_avg = [] roi_std = [] for i in range(len(ROI_list)): roi_avg.append(np.average(ROI_list[i], axis=(0, 1))) roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1)) roi_avg = np.array(roi_avg) roi_std = np.array(roi_std) mean_red = roi_avg[:, 0] std_red = roi_std[:, 0] mean_green = roi_avg[:, 1] std_green = roi_std[:, 1] mean_blue = roi_avg[:, 2] std_blue = roi_std[:, 2] ippg_chanel_data = np.array((mean_red,std_red,mean_green,std_green,mean_blue,std_blue)).T print(ippg_chanel_data ) # ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6) HR_pred = [] model_list = joblib.load( './model_weight/lgb_model_threechanel2HR.pkl') for model in model_list: result = model.predict(ippg_chanel_data) HR_pred.append(result+10) HR = np.mean(HR_pred, axis=0) return np.mean(HR)