Spaces:
Runtime error
Runtime error
import numpy as np | |
import math | |
import cv2 | |
import torch | |
from scipy import signal | |
from scipy.fft import fft | |
from scipy.signal import butter, filtfilt | |
# from facenet_pytorch import MTCNN | |
from face_detection import FaceDetection | |
import joblib | |
def butter_bandpass(sig, lowcut, highcut, fs, order=2): | |
# butterworth bandpass filter | |
sig = np.reshape(sig, -1) | |
nyq = 0.5 * fs | |
low = lowcut / nyq | |
high = highcut / nyq | |
b, a = butter(order, [low, high], btype='band') | |
y = filtfilt(b, a, sig) | |
return y | |
def face_detection(video_list): | |
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
mtcnn = MTCNN(device=device) | |
face_list = [] | |
for t, frame in enumerate(video_list): | |
if t == 0: | |
boxes, _, = mtcnn.detect( | |
frame) # we only detect face bbox in the first frame, keep it in the following frames. | |
if t == 0: | |
box_len = np.max([boxes[0, 2] - boxes[0, 0], boxes[0, 3] - boxes[0, 1]]) | |
box_half_len = np.round(box_len / 2 * 1.1).astype('int') | |
box_mid_y = np.round((boxes[0, 3] + boxes[0, 1]) / 2).astype('int') | |
box_mid_x = np.round((boxes[0, 2] + boxes[0, 0]) / 2).astype('int') | |
cropped_face = frame[box_mid_y - box_half_len:box_mid_y + box_half_len, | |
box_mid_x - box_half_len:box_mid_x + box_half_len] | |
cropped_face = cv2.resize(cropped_face, (128, 128)) | |
face_list.append(cropped_face) | |
print('face detection %2d' % (100 * (t + 1) / len(video_list)), '%', end='\r', flush=True) | |
face_list = np.array(face_list) # (T, H, W, C) | |
face_list = np.transpose(face_list, (3, 0, 1, 2)) # (C, T, H, W) | |
face_list = np.array(face_list)[np.newaxis] | |
return face_list | |
def face_detection_ROI(face_detection, frame_list): | |
face_frame_list = [] | |
ROI1_list = [] | |
ROI2_list = [] | |
for i in range(0, frame_list.shape[0]): | |
frame = frame_list[i] | |
frame, face_frame, ROI1, ROI2, status, mask, face_region = FaceDetection.face_detect(face_detection, frame) | |
face_frame_list.append(face_frame) | |
ROI1_list.append(ROI1) | |
ROI2_list.append(ROI2) | |
return np.array(face_frame_list), np.array(ROI1_list), np.array(ROI2_list), status, face_region | |
def butter_bandpass(sig, lowcut, highcut, fs, order=2): | |
# butterworth bandpass filter | |
sig = np.reshape(sig, -1) | |
nyq = 0.5 * fs | |
low = lowcut / nyq | |
high = highcut / nyq | |
b, a = butter(order, [low, high], btype='band') | |
y = filtfilt(b, a, sig) | |
return y | |
def hr_fft(sig, fs, harmonics_removal=True): | |
# get heart rate by FFT | |
# return both heart rate and PSD | |
sig = sig.reshape(-1) | |
sig = sig * signal.windows.hann(sig.shape[0]) | |
sig_f = np.abs(fft(sig)) | |
low_idx = np.round(0.6 / fs * sig.shape[0]).astype('int') | |
high_idx = np.round(4 / fs * sig.shape[0]).astype('int') | |
sig_f_original = sig_f.copy() | |
sig_f[:low_idx] = 0 | |
sig_f[high_idx:] = 0 | |
peak_idx, _ = signal.find_peaks(sig_f) | |
sort_idx = np.argsort(sig_f[peak_idx]) | |
sort_idx = sort_idx[::-1] | |
peak_idx1 = peak_idx[sort_idx[0]] | |
peak_idx2 = peak_idx[sort_idx[1]] | |
f_hr1 = peak_idx1 / sig_f.shape[0] * fs | |
hr1 = f_hr1 * 60 | |
f_hr2 = peak_idx2 / sig_f.shape[0] * fs | |
hr2 = f_hr2 * 60 | |
if harmonics_removal: | |
if np.abs(hr1-2*hr2)<10: | |
hr = hr2 | |
else: | |
hr = hr1 | |
else: | |
hr = hr1 | |
x_hr = np.arange(len(sig_f))/len(sig_f)*fs*60 | |
return hr, sig_f_original, x_hr | |
def hr_fft_2(processed, fps): | |
L = len(processed) | |
fps = 30#float(L) / (times[-1] - times[-L]) # calculate HR using a true fps of processor of the computer, not the fps the camera provide | |
LEN = int(fps * 1.55) | |
# even_times = np.linspace(times[-L], times[-1], LEN) | |
processed = signal.detrend(processed) # detrend the signal to avoid interference of light change | |
# interpolated = np.interp(even_times, times[-L:], processed) # interpolation by 1 | |
interpolated = processed | |
# interpolated = np.hamming(LEN) * interpolated # make the signal become more periodic (advoid spectral leakage) | |
norm = (interpolated - np.mean(interpolated))/np.std(interpolated)#normalization | |
# norm = interpolated / np.linalg.norm(interpolated) | |
raw = np.fft.rfft(norm * 30) # do real fft with the normalization multiplied by 10 | |
raw_r = raw.copy() | |
freqs = float(fps) / LEN * np.arange(LEN / 2 + 1) | |
freqs = 60. * freqs | |
idx_remove = np.where((freqs < 50) & (freqs > 180)) | |
raw[idx_remove] = 0 | |
fft = np.abs(raw) ** 2 # get amplitude spectrum | |
idx = np.where((freqs > 50) & (freqs < 180)) # the range of frequency that HR is supposed to be within | |
pruned = fft[idx] | |
pfreq = freqs[idx] | |
# freqs = pfreq | |
fft = pruned | |
idx2 = np.argmax(pruned) # max in the range can be HR | |
bpm = pfreq[idx2] | |
# calculate Respiratory Rate, 计算呼吸率 | |
idx_remove = np.where((freqs < 5) & (freqs > 60)) | |
raw_r[idx_remove] = 0 | |
fft_r = np.abs(raw_r) ** 2 | |
idx = np.where((freqs > 5) & (freqs < 60)) | |
pruned_r = fft_r[idx] | |
pfreq = freqs[idx] | |
idx3 = np.argmax(pruned_r) | |
pruned_r[idx3] = 0 | |
idx3 = np.argmax(pruned_r) | |
respiratory_rate = pfreq[idx3] | |
return bpm, respiratory_rate | |
def calc_rr(peaklist, sample_rate, working_data={}): | |
peaklist = np.array(peaklist) #cast numpy array to be sure or correct array type | |
working_data['peaklist'] = peaklist # Make sure, peaklist is always an np.array | |
rr_list = (np.diff(peaklist) / sample_rate) * 1000.0 | |
rr_indices = [(peaklist[i], peaklist[i+1]) for i in range(len(peaklist) - 1)] | |
rr_diff = np.abs(np.diff(rr_list)) | |
rr_sqdiff = np.power(rr_diff, 2) | |
working_data['RR_list'] = rr_list | |
working_data['RR_indices'] = rr_indices | |
working_data['RR_diff'] = rr_diff | |
working_data['RR_sqdiff'] = rr_sqdiff | |
return working_data | |
def calculate_hrv(ippg, fps): | |
peak_array, _ = signal.find_peaks(ippg[-200::2]) # down sample rate: 2 | |
peak_list = peak_array.tolist() | |
result = calc_rr(peak_list, fps/2) | |
# print(peak_list) | |
# RR_list = result['RR_list'].tolist() | |
# RR_diff = result['RR_diff'].tolist() | |
# print(RR_list) | |
# print(RR_diff) | |
RR_std = 0 | |
if len(result['RR_list']) > 0: | |
RR_std = np.std(result['RR_list'], ddof=1) # calculate RR interval standard deviation | |
if math.isnan(RR_std): | |
RR_std = 0 | |
return RR_std | |
def RGB_SpO2(ROI_list): | |
roi_avg = [] | |
roi_std = [] | |
for i in range(len(ROI_list)): | |
roi_avg.append(np.average(ROI_list[i], axis=(0, 1))) | |
roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1)) | |
roi_avg = np.array(roi_avg) | |
roi_std = np.array(roi_std) | |
mean_red = roi_avg[:, 0] | |
std_red = roi_std[:, 0] | |
mean_green = roi_avg[:, 1] | |
std_green = roi_std[:, 1] | |
mean_blue = roi_avg[:, 2] | |
std_blue = roi_std[:, 2] | |
A = -11.2 | |
B = 109.3 | |
R = (np.average(mean_red) / np.average(std_red)) / (np.average(mean_blue) / np.average(std_blue)) | |
SpO2 = A * R + B | |
return SpO2 | |
def RGB_HR(ROI_list): | |
roi_avg = [] | |
roi_std = [] | |
for i in range(len(ROI_list)): | |
roi_avg.append(np.average(ROI_list[i], axis=(0, 1))) | |
roi_std.append(np.std(ROI_list[i], axis=(0, 1), ddof=1)) | |
roi_avg = np.array(roi_avg) | |
roi_std = np.array(roi_std) | |
mean_red = roi_avg[:, 0] | |
std_red = roi_std[:, 0] | |
mean_green = roi_avg[:, 1] | |
std_green = roi_std[:, 1] | |
mean_blue = roi_avg[:, 2] | |
std_blue = roi_std[:, 2] | |
ippg_chanel_data = np.array((mean_red,std_red,mean_green,std_green,mean_blue,std_blue)).T | |
print(ippg_chanel_data ) | |
# ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6) | |
HR_pred = [] | |
model_list = joblib.load( './model_weight/lgb_model_threechanel2HR.pkl') | |
for model in model_list: | |
result = model.predict(ippg_chanel_data) | |
HR_pred.append(result+10) | |
HR = np.mean(HR_pred, axis=0) | |
return np.mean(HR) |